001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.scheduler; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicBoolean; 021 022import org.apache.activemq.ScheduledMessage; 023import org.apache.activemq.advisory.AdvisorySupport; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.BrokerFilter; 026import org.apache.activemq.broker.BrokerService; 027import org.apache.activemq.broker.Connection; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.Connector; 030import org.apache.activemq.broker.ProducerBrokerExchange; 031import org.apache.activemq.broker.region.ConnectionStatistics; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.Command; 034import org.apache.activemq.command.ConnectionControl; 035import org.apache.activemq.command.ExceptionResponse; 036import org.apache.activemq.command.Message; 037import org.apache.activemq.command.MessageId; 038import org.apache.activemq.command.ProducerId; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.Response; 041import org.apache.activemq.openwire.OpenWireFormat; 042import org.apache.activemq.security.SecurityContext; 043import org.apache.activemq.state.ProducerState; 044import org.apache.activemq.transaction.Synchronization; 045import org.apache.activemq.usage.JobSchedulerUsage; 046import org.apache.activemq.usage.SystemUsage; 047import org.apache.activemq.util.ByteSequence; 048import org.apache.activemq.util.IdGenerator; 049import org.apache.activemq.util.LongSequenceGenerator; 050import org.apache.activemq.util.TypeConversionSupport; 051import org.apache.activemq.wireformat.WireFormat; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055public class SchedulerBroker extends BrokerFilter implements JobListener { 056 private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class); 057 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 058 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 059 private final AtomicBoolean started = new AtomicBoolean(); 060 private final WireFormat wireFormat = new OpenWireFormat(); 061 private final ConnectionContext context = new ConnectionContext(); 062 private final ProducerId producerId = new ProducerId(); 063 private final SystemUsage systemUsage; 064 065 private final JobSchedulerStore store; 066 private JobScheduler scheduler; 067 068 public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception { 069 super(next); 070 071 this.store = store; 072 this.producerId.setConnectionId(ID_GENERATOR.generateId()); 073 this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 074 // we only get response on unexpected error 075 this.context.setConnection(new Connection() { 076 @Override 077 public Connector getConnector() { 078 return null; 079 } 080 081 @Override 082 public void dispatchSync(Command message) { 083 if (message instanceof ExceptionResponse) { 084 LOG.warn("Unexpected response: " + message); 085 } 086 } 087 088 @Override 089 public void dispatchAsync(Command command) { 090 if (command instanceof ExceptionResponse) { 091 LOG.warn("Unexpected response: " + command); 092 } 093 } 094 095 @Override 096 public Response service(Command command) { 097 return null; 098 } 099 100 @Override 101 public void serviceException(Throwable error) { 102 LOG.warn("Unexpected exception: " + error, error); 103 } 104 105 @Override 106 public boolean isSlow() { 107 return false; 108 } 109 110 @Override 111 public boolean isBlocked() { 112 return false; 113 } 114 115 @Override 116 public boolean isConnected() { 117 return false; 118 } 119 120 @Override 121 public boolean isActive() { 122 return false; 123 } 124 125 @Override 126 public int getDispatchQueueSize() { 127 return 0; 128 } 129 130 @Override 131 public ConnectionStatistics getStatistics() { 132 return null; 133 } 134 135 @Override 136 public boolean isManageable() { 137 return false; 138 } 139 140 @Override 141 public String getRemoteAddress() { 142 return null; 143 } 144 145 @Override 146 public void serviceExceptionAsync(IOException e) { 147 LOG.warn("Unexpected async ioexception: " + e, e); 148 } 149 150 @Override 151 public String getConnectionId() { 152 return null; 153 } 154 155 @Override 156 public boolean isNetworkConnection() { 157 return false; 158 } 159 160 @Override 161 public boolean isFaultTolerantConnection() { 162 return false; 163 } 164 165 @Override 166 public void updateClient(ConnectionControl control) {} 167 168 @Override 169 public int getActiveTransactionCount() { 170 return 0; 171 } 172 173 @Override 174 public Long getOldestActiveTransactionDuration() { 175 return null; 176 } 177 178 @Override 179 public void start() throws Exception {} 180 181 @Override 182 public void stop() throws Exception {} 183 }); 184 this.context.setBroker(next); 185 this.systemUsage = brokerService.getSystemUsage(); 186 187 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 188 } 189 190 public synchronized JobScheduler getJobScheduler() throws Exception { 191 return new JobSchedulerFacade(this); 192 } 193 194 @Override 195 public void start() throws Exception { 196 this.started.set(true); 197 getInternalScheduler(); 198 super.start(); 199 } 200 201 @Override 202 public void stop() throws Exception { 203 if (this.started.compareAndSet(true, false)) { 204 205 if (this.store != null) { 206 this.store.stop(); 207 } 208 if (this.scheduler != null) { 209 this.scheduler.removeListener(this); 210 this.scheduler = null; 211 } 212 } 213 super.stop(); 214 } 215 216 @Override 217 public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception { 218 ConnectionContext context = producerExchange.getConnectionContext(); 219 220 final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID); 221 final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 222 final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 223 final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 224 225 String physicalName = messageSend.getDestination().getPhysicalName(); 226 boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0, 227 ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length()); 228 229 if (schedularManage == true) { 230 231 JobScheduler scheduler = getInternalScheduler(); 232 ActiveMQDestination replyTo = messageSend.getReplyTo(); 233 234 String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION); 235 236 if (action != null) { 237 238 Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME); 239 Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME); 240 241 if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) { 242 243 if (startTime != null && endTime != null) { 244 245 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 246 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 247 248 for (Job job : scheduler.getAllJobs(start, finish)) { 249 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 250 } 251 } else { 252 for (Job job : scheduler.getAllJobs()) { 253 sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo); 254 } 255 } 256 } 257 if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) { 258 scheduler.remove(jobId); 259 } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) { 260 261 if (startTime != null && endTime != null) { 262 263 long start = (Long) TypeConversionSupport.convert(startTime, Long.class); 264 long finish = (Long) TypeConversionSupport.convert(endTime, Long.class); 265 266 scheduler.removeAllJobs(start, finish); 267 } else { 268 scheduler.removeAllJobs(); 269 } 270 } 271 } 272 273 } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) { 274 275 // Check for room in the job scheduler store 276 if (systemUsage.getJobSchedulerUsage() != null) { 277 JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage(); 278 if (usage.isFull()) { 279 final String logMessage = "Job Scheduler Store is Full (" + 280 usage.getPercentUsage() + "% of " + usage.getLimit() + 281 "). Stopping producer (" + messageSend.getProducerId() + 282 ") to prevent flooding of the job scheduler store." + 283 " See http://activemq.apache.org/producer-flow-control.html for more info"; 284 285 long start = System.currentTimeMillis(); 286 long nextWarn = start; 287 while (!usage.waitForSpace(1000)) { 288 if (context.getStopping().get()) { 289 throw new IOException("Connection closed, send aborted."); 290 } 291 292 long now = System.currentTimeMillis(); 293 if (now >= nextWarn) { 294 LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)"); 295 nextWarn = now + 30000l; 296 } 297 } 298 } 299 } 300 301 if (context.isInTransaction()) { 302 context.getTransaction().addSynchronization(new Synchronization() { 303 @Override 304 public void afterCommit() throws Exception { 305 doSchedule(messageSend, cronValue, periodValue, delayValue); 306 } 307 }); 308 } else { 309 doSchedule(messageSend, cronValue, periodValue, delayValue); 310 } 311 } else { 312 super.send(producerExchange, messageSend); 313 } 314 } 315 316 private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception { 317 long delay = 0; 318 long period = 0; 319 int repeat = 0; 320 String cronEntry = ""; 321 322 // clear transaction context 323 Message msg = messageSend.copy(); 324 msg.setTransactionId(null); 325 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg); 326 if (cronValue != null) { 327 cronEntry = cronValue.toString(); 328 } 329 if (periodValue != null) { 330 period = (Long) TypeConversionSupport.convert(periodValue, Long.class); 331 } 332 if (delayValue != null) { 333 delay = (Long) TypeConversionSupport.convert(delayValue, Long.class); 334 } 335 Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 336 if (repeatValue != null) { 337 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 338 } 339 340 getInternalScheduler().schedule(msg.getMessageId().toString(), 341 new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat); 342 } 343 344 @Override 345 public void scheduledJob(String id, ByteSequence job) { 346 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength()); 347 try { 348 Message messageSend = (Message) wireFormat.unmarshal(packet); 349 messageSend.setOriginalTransactionId(null); 350 Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 351 Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 352 String cronStr = cronValue != null ? cronValue.toString() : null; 353 int repeat = 0; 354 if (repeatValue != null) { 355 repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); 356 } 357 358 if (repeat != 0 || cronStr != null && cronStr.length() > 0) { 359 // create a unique id - the original message could be sent 360 // lots of times 361 messageSend.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId())); 362 } 363 364 // Add the jobId as a property 365 messageSend.setProperty("scheduledJobId", id); 366 367 // if this goes across a network - we don't want it rescheduled 368 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD); 369 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY); 370 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT); 371 messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON); 372 373 if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) { 374 375 long oldExpiration = messageSend.getExpiration(); 376 long newTimeStamp = System.currentTimeMillis(); 377 long timeToLive = 0; 378 long oldTimestamp = messageSend.getTimestamp(); 379 380 if (oldExpiration > 0) { 381 timeToLive = oldExpiration - oldTimestamp; 382 } 383 384 long expiration = timeToLive + newTimeStamp; 385 386 if (expiration > oldExpiration) { 387 if (timeToLive > 0 && expiration > 0) { 388 messageSend.setExpiration(expiration); 389 } 390 messageSend.setTimestamp(newTimeStamp); 391 LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ messageSend.getMessageId(), oldTimestamp, newTimeStamp }); 392 } 393 } 394 395 // Repackage the message contents prior to send now that all updates are complete. 396 messageSend.beforeMarshall(wireFormat); 397 398 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 399 producerExchange.setConnectionContext(context); 400 producerExchange.setMutable(true); 401 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 402 super.send(producerExchange, messageSend); 403 } catch (Exception e) { 404 LOG.error("Failed to send scheduled message {}", id, e); 405 } 406 } 407 408 protected synchronized JobScheduler getInternalScheduler() throws Exception { 409 if (this.started.get()) { 410 if (this.scheduler == null && store != null) { 411 this.scheduler = store.getJobScheduler("JMS"); 412 this.scheduler.addListener(this); 413 this.scheduler.startDispatching(); 414 } 415 return this.scheduler; 416 } 417 return null; 418 } 419 420 protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception { 421 422 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload()); 423 try { 424 Message msg = (Message) this.wireFormat.unmarshal(packet); 425 msg.setOriginalTransactionId(null); 426 msg.setPersistent(false); 427 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 428 msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId())); 429 msg.setDestination(replyTo); 430 msg.setResponseRequired(false); 431 msg.setProducerId(this.producerId); 432 433 // Add the jobId as a property 434 msg.setProperty("scheduledJobId", job.getJobId()); 435 436 final boolean originalFlowControl = context.isProducerFlowControl(); 437 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 438 producerExchange.setConnectionContext(context); 439 producerExchange.setMutable(true); 440 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 441 try { 442 context.setProducerFlowControl(false); 443 this.next.send(producerExchange, msg); 444 } finally { 445 context.setProducerFlowControl(originalFlowControl); 446 } 447 } catch (Exception e) { 448 LOG.error("Failed to send scheduled message {}", job.getJobId(), e); 449 } 450 } 451}