KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > scheduler > AbstractScheduler


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): Jean-Bernard van Zuylen.
23  */

24
25 package org.objectweb.cjdbc.controller.scheduler;
26
27 import java.sql.SQLException JavaDoc;
28
29 import org.objectweb.cjdbc.common.exceptions.RollbackException;
30 import org.objectweb.cjdbc.common.i18n.Translate;
31 import org.objectweb.cjdbc.common.log.Trace;
32 import org.objectweb.cjdbc.common.sql.AbstractRequest;
33 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
34 import org.objectweb.cjdbc.common.sql.SelectRequest;
35 import org.objectweb.cjdbc.common.sql.StoredProcedure;
36 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
37 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
38 import org.objectweb.cjdbc.common.xml.XmlComponent;
39 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
40
41 /**
42  * The Request Scheduler should schedule the request according to a given
43  * policy.
44  * <p>
45  * The requests comes from the Request Controller and are sent later to the next
46  * ccontroller omponents (cache and load balancer).
47  *
48  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
49  * @author <a HREF="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
50  * </a>
51  * @version 1.0
52  */

53 public abstract class AbstractScheduler implements XmlComponent
54 {
55
56   //
57
// How the code is organized ?
58
//
59
// 1. Member variables
60
// 2. Constructor
61
// 3. Getter/Setter (possibly in alphabetical order)
62
// 4. Request handling
63
// 5. Transaction management
64
// 6. Checkpoint management
65
// 7. Debug/Monitoring
66
//
67

68   protected int raidbLevel;
69   protected int parsingGranularity;
70
71   // Transaction management
72
private long tid;
73   private int sid;
74   private boolean suspendedTransactions = false;
75   private int pendingTransactions;
76   private Object JavaDoc transactionSync = new Object JavaDoc();
77   private Object JavaDoc endOfCurrentTransactions = new Object JavaDoc();
78
79   // Writes management
80
private boolean suspendedWrites = false;
81   private int pendingWrites;
82   private Object JavaDoc writesSync = new Object JavaDoc();
83   private Object JavaDoc endOfCurrentWrites = new Object JavaDoc();
84
85   protected static Trace logger = Trace
86                                                       .getLogger("org.objectweb.cjdbc.controller.scheduler");
87
88   // Monitoring values
89
private int numberRead = 0;
90   private int numberWrite = 0;
91
92   //
93
// Constructor
94
//
95

96   /**
97    * Default scheduler to assign scheduler RAIDb level, needed granularity and
98    * SQL macro handling (on the fly instanciation of NOW(), RAND(), ...).
99    *
100    * @param raidbLevel RAIDb level of this scheduler
101    * @param parsingGranularity Parsing granularity needed by the scheduler
102    */

103   public AbstractScheduler(int raidbLevel, int parsingGranularity)
104   {
105     this.raidbLevel = raidbLevel;
106     this.parsingGranularity = parsingGranularity;
107     this.tid = 0;
108     this.sid = 0;
109     this.pendingTransactions = 0;
110     this.pendingWrites = 0;
111   }
112
113   //
114
// Getter/Setter methods
115
//
116

117   /**
118    * Initialize the transaction id with the given value (usually retrieved from
119    * the recovery log).
120    *
121    * @param transactionId new current transaction identifier
122    */

123   public final void initializeTransactionId(long transactionId)
124   {
125     this.tid = transactionId;
126   }
127
128   /**
129    * Get the needed query parsing granularity.
130    *
131    * @return needed query parsing granularity
132    */

133   public final int getParsingGranularity()
134   {
135     return parsingGranularity;
136   }
137
138   /**
139    * Set the needed query parsing granularity.
140    *
141    * @param parsingGranularity Parsing granularity needed by the scheduler
142    */

143   public final void setParsingGranularity(int parsingGranularity)
144   {
145     this.parsingGranularity = parsingGranularity;
146   }
147
148   /**
149    * Returns the number of pending writes.
150    *
151    * @return int
152    */

153   public final int getPendingWrites()
154   {
155     return pendingWrites;
156   }
157
158   /**
159    * Returns the RAIDbLevel.
160    *
161    * @return int
162    */

163   public final int getRAIDbLevel()
164   {
165     return raidbLevel;
166   }
167
168   /**
169    * Sets the RAIDb level.
170    *
171    * @param raidbLevel The RAIDbLevel to set
172    */

173   public final void setRAIDbLevel(int raidbLevel)
174   {
175     this.raidbLevel = raidbLevel;
176   }
177
178   /**
179    * Sets the <code>DatabaseSchema</code> of the current virtual database.
180    * This is only needed by some schedulers that will have to define their own
181    * scheduler schema
182    *
183    * @param dbs a <code>DatabaseSchema</code> value
184    * @see org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema
185    */

186   public void setDatabaseSchema(DatabaseSchema dbs)
187   {
188     if (logger.isInfoEnabled())
189       logger.info(Translate.get("scheduler.doesnt.support.schemas"));
190   }
191
192   /**
193    * Merge the given <code>DatabaseSchema</code> with the current one.
194    *
195    * @param dbs a <code>DatabaseSchema</code> value
196    * @see org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema
197    */

198   public void mergeDatabaseSchema(DatabaseSchema dbs)
199   {
200     logger.info(Translate.get("scheduler.doesnt.support.schemas"));
201   }
202
203   /**
204    * Increments the savepoint id for un-named savepoints
205    *
206    * @return the next savepoint Id
207    */

208   public synchronized int incrementSavepointId()
209   {
210     sid++;
211     return sid;
212   }
213
214   //
215
// Request Scheduling
216
//
217

218   /**
219    * Schedule a read request (implementation specific). This method blocks until
220    * the read can be executed.
221    *
222    * @param request Select request to schedule (SQL macros are already handled
223    * if needed)
224    * @exception SQLException if a timeout occurs
225    */

226   public abstract void scheduleReadRequest(SelectRequest request)
227       throws SQLException JavaDoc;
228
229   /**
230    * Notify the completion of a read statement.
231    *
232    * @param request the completed request
233    */

234   public abstract void readCompletedNotify(SelectRequest request);
235
236   /**
237    * Notify the completion of a read statement.
238    *
239    * @param request the completed request
240    */

241   public final void readCompleted(SelectRequest request)
242   {
243     numberRead++;
244     this.readCompletedNotify(request);
245   }
246
247   /**
248    * Schedule a write request. This method blocks if the writes are suspended.
249    * Then the number of pending writes is updated and the implementation
250    * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
251    * are replaced in the request if the scheduler has needSQLMacroHandling set
252    * to true.
253    *
254    * @param request Write request to schedule
255    * @exception SQLException if a timeout occurs
256    * @exception RollbackException if an error occurs
257    * @see #scheduleNonSuspendedWriteRequest(AbstractWriteRequest)
258    */

259   public final void scheduleWriteRequest(AbstractWriteRequest request)
260       throws SQLException JavaDoc, RollbackException
261   {
262     suspendWriteIfNeeded(request);
263     scheduleNonSuspendedWriteRequest(request);
264   }
265
266   /**
267    * Schedule a write request (implementation specific). This method blocks
268    * until the request can be executed.
269    *
270    * @param request Write request to schedule (SQL macros are already handled if
271    * needed)
272    * @exception SQLException if a timeout occurs
273    * @exception RollbackException if the transaction must be rollbacked
274    */

275   public abstract void scheduleNonSuspendedWriteRequest(
276       AbstractWriteRequest request) throws SQLException JavaDoc, RollbackException;
277
278   /**
279    * Notify the completion of a write statement.
280    * <p>
281    * This method updates the number of pending writes and calls the
282    * implementation specific notifyWriteCompleted function.
283    * <p>
284    * Finally, the suspendWrites() function is notified if needed.
285    *
286    * @param request the completed request
287    * @see #notifyWriteCompleted(AbstractWriteRequest)
288    * @see #suspendWrites()
289    */

290   public final void writeCompleted(AbstractWriteRequest request)
291   {
292     synchronized (writesSync)
293     {
294       pendingWrites--;
295
296       if (logger.isDebugEnabled())
297         logger.debug("Write completed, remaining pending writes: "
298             + pendingWrites);
299
300       notifyWriteCompleted(request);
301
302       // It this is the last write to complete and writes are
303
// suspended we have to notify suspendedWrites()
304
if (suspendedWrites && (pendingWrites == 0))
305       {
306         synchronized (endOfCurrentWrites)
307         {
308           endOfCurrentWrites.notifyAll();
309         }
310       }
311     }
312     numberWrite++;
313   }
314
315   /**
316    * Notify the completion of a write statement. This method does not need to be
317    * synchronized, it is enforced by the caller.
318    *
319    * @param request the completed request
320    * @see #writeCompleted(AbstractWriteRequest)
321    */

322   public abstract void notifyWriteCompleted(AbstractWriteRequest request);
323
324   /**
325    * Schedule a write request. This method blocks if the writes are suspended.
326    * Then the number of pending writes is updated and the implementation
327    * specific scheduleNonSuspendedWriteRequest function is called. SQL macros
328    * are replaced in the request if the scheduler has needSQLMacroHandling set
329    * to true.
330    *
331    * @param proc Stored procedure to schedule
332    * @exception SQLException if a timeout occurs
333    * @exception RollbackException if an error occurs
334    * @see #scheduleNonSuspendedStoredProcedure(StoredProcedure)
335    */

336   public final void scheduleStoredProcedure(StoredProcedure proc)
337       throws SQLException JavaDoc, RollbackException
338   {
339     suspendWriteIfNeeded(proc);
340     scheduleNonSuspendedStoredProcedure(proc);
341   }
342
343   /**
344    * Schedule a write request (implementation specific). This method blocks
345    * until the request can be executed.
346    *
347    * @param proc Stored procedure to schedule
348    * @exception SQLException if a timeout occurs
349    * @exception RollbackException if the transaction must be rollbacked
350    */

351   public abstract void scheduleNonSuspendedStoredProcedure(StoredProcedure proc)
352       throws SQLException JavaDoc, RollbackException;
353
354   /**
355    * Notify the completion of a stored procedure.
356    * <p>
357    * This method updates the number of pending writes and calls the
358    * implementation specific notifyStoredProcedureCompleted function.
359    * <p>
360    * Finally, the suspendWrites() function is notified if needed.
361    *
362    * @param proc the completed stored procedure
363    * @see #notifyStoredProcedureCompleted(StoredProcedure)
364    * @see #suspendWrites()
365    */

366   public final void storedProcedureCompleted(StoredProcedure proc)
367   {
368     synchronized (writesSync)
369     {
370       pendingWrites--;
371
372       if (logger.isDebugEnabled())
373         logger.debug("Stored procedure completed, remaining pending writes: "
374             + pendingWrites);
375
376       notifyStoredProcedureCompleted(proc);
377
378       // It this is the last write to complete and writes are
379
// suspended we have to notify suspendedWrites()
380
if (suspendedWrites && (pendingWrites == 0))
381       {
382         synchronized (endOfCurrentWrites)
383         {
384           endOfCurrentWrites.notifyAll();
385         }
386       }
387     }
388     numberWrite++;
389   }
390
391   /**
392    * Notify the completion of a stored procedure. This method does not need to
393    * be synchronized, it is enforced by the caller.
394    *
395    * @param proc the completed stored procedure
396    * @see #storedProcedureCompleted(StoredProcedure)
397    */

398   public abstract void notifyStoredProcedureCompleted(StoredProcedure proc);
399
400   /**
401    * Suspend write requests if suspendedWrites is active.
402    *
403    * @param request the request to suspend (a write request or a stored
404    * procedure)
405    * @throws SQLException if the request timeout has expired
406    */

407   private void suspendWriteIfNeeded(AbstractRequest request)
408       throws SQLException JavaDoc
409   {
410     synchronized (writesSync)
411     {
412       if (suspendedWrites)
413       {
414         try
415         {
416           // Wait on writesSync
417
int timeout = request.getTimeout();
418           if (timeout > 0)
419           {
420             long start = System.currentTimeMillis();
421             long lTimeout = timeout * 1000;
422             writesSync.wait(lTimeout);
423             long end = System.currentTimeMillis();
424             int remaining = (int) (lTimeout - (end - start));
425             if (remaining > 0)
426               request.setTimeout(remaining);
427             else
428             {
429               String JavaDoc msg = Translate.get("scheduler.request.timeout",
430                   new String JavaDoc[]{String.valueOf(request.getId()),
431                       String.valueOf(request.getTimeout())});
432               logger.warn(msg);
433               throw new SQLException JavaDoc(msg);
434             }
435           }
436           else
437             this.writesSync.wait();
438         }
439         catch (InterruptedException JavaDoc e)
440         {
441           String JavaDoc msg = Translate.get("scheduler.request.timeout.failed", e);
442           logger.warn(msg);
443           throw new SQLException JavaDoc(msg);
444         }
445       }
446       pendingWrites++;
447
448       if (logger.isDebugEnabled())
449         logger.debug("Schedule " + request.getSQL()
450             + " - Current pending writes: " + pendingWrites);
451     }
452   }
453
454   //
455
// Transaction management
456
//
457

458   /**
459    * Begin a new transaction and return the corresponding transaction
460    * identifier. This method is called from the driver when setAutoCommit(false)
461    * is called.
462    *
463    * @param tm The transaction marker metadata
464    * @return the transaction identifier
465    * @throws SQLException if an error occurs
466    */

467   public final long begin(TransactionMarkerMetaData tm) throws SQLException JavaDoc
468   {
469     // Check if writes are suspended
470
synchronized (writesSync)
471     {
472       if (suspendedWrites)
473       {
474         try
475         {
476           // Wait on writesSync
477
long timeout = tm.getTimeout();
478           if (timeout > 0)
479           {
480             long start = System.currentTimeMillis();
481             writesSync.wait(timeout);
482             long end = System.currentTimeMillis();
483             long remaining = timeout - (end - start);
484             if (remaining > 0)
485               tm.setTimeout(remaining);
486             else
487             {
488               String JavaDoc msg = Translate.get("scheduler.begin.timeout.writeSync");
489               logger.warn(msg);
490               throw new SQLException JavaDoc(msg);
491             }
492           }
493           else
494             writesSync.wait();
495         }
496         catch (InterruptedException JavaDoc e)
497         {
498           String JavaDoc msg = Translate.get("scheduler.begin.timeout.writeSync")
499               + " (" + e + ")";
500           logger.error(msg);
501           throw new SQLException JavaDoc(msg);
502         }
503       }
504       pendingWrites++;
505
506       if (logger.isDebugEnabled())
507         logger.debug("Begin scheduled - current pending writes: "
508             + pendingWrites);
509     }
510
511     // Check if transactions are suspended
512
synchronized (transactionSync)
513     {
514       if (suspendedTransactions)
515         try
516         {
517           // Wait on transactionSync
518
long timeout = tm.getTimeout();
519           if (timeout > 0)
520           {
521             long start = System.currentTimeMillis();
522             transactionSync.wait(timeout);
523             long end = System.currentTimeMillis();
524             long remaining = timeout - (end - start);
525             if (remaining > 0)
526               tm.setTimeout(remaining);
527             else
528             {
529               String JavaDoc msg = Translate
530                   .get("scheduler.begin.timeout.transactionSync");
531               logger.warn(msg);
532               throw new SQLException JavaDoc(msg);
533             }
534           }
535           else
536             transactionSync.wait();
537         }
538         catch (InterruptedException JavaDoc e)
539         {
540           String JavaDoc msg = Translate.get("scheduler.begin.timeout.transactionSync")
541               + " (" + e + ")";
542           logger.error(msg);
543           throw new SQLException JavaDoc(msg);
544         }
545       tid++;
546       pendingTransactions++;
547
548       if (logger.isDebugEnabled())
549         logger.debug("Begin scheduled - current pending transactions: "
550             + pendingTransactions);
551       return tid;
552     }
553   }
554
555   /**
556    * Notify the completion of a begin command.
557    *
558    * @param transactionId of the completed begin
559    */

560   public final void beginCompleted(long transactionId)
561   {
562     // Take care of suspended write
563
synchronized (writesSync)
564     {
565       pendingWrites--;
566
567       if (logger.isDebugEnabled())
568         logger.debug("Begin completed, remaining pending writes: "
569             + pendingWrites);
570
571       // It this is the last write to complete and writes are
572
// suspended we have to notify suspendedWrites()
573
if (suspendedWrites && (pendingWrites == 0))
574       {
575         synchronized (endOfCurrentWrites)
576         {
577           endOfCurrentWrites.notifyAll();
578         }
579       }
580     }
581   }
582
583   /**
584    * Commit a transaction.
585    * <p>
586    * Calls the implementation specific commitTransaction()
587    *
588    * @param tm The transaction marker metadata
589    * @throws SQLException if an error occurs
590    * @see #commitTransaction(long)
591    */

592   public final void commit(TransactionMarkerMetaData tm) throws SQLException JavaDoc
593   {
594     // Check if writes are suspended
595
synchronized (writesSync)
596     {
597       if (suspendedWrites)
598       {
599         try
600         {
601           // Wait on writesSync
602
long timeout = tm.getTimeout();
603           if (timeout > 0)
604           {
605             long start = System.currentTimeMillis();
606             writesSync.wait(timeout);
607             long end = System.currentTimeMillis();
608             long remaining = timeout - (end - start);
609             if (remaining > 0)
610               tm.setTimeout(remaining);
611             else
612             {
613               String JavaDoc msg = Translate.get("scheduler.commit.timeout.writeSync");
614               logger.warn(msg);
615               throw new SQLException JavaDoc(msg);
616             }
617           }
618           else
619             writesSync.wait();
620         }
621         catch (InterruptedException JavaDoc e)
622         {
623           String JavaDoc msg = Translate.get("scheduler.commit.timeout.writeSync")
624               + " (" + e + ")";
625           logger.error(msg);
626           throw new SQLException JavaDoc(msg);
627         }
628       }
629       pendingWrites++;
630
631       if (logger.isDebugEnabled())
632         logger.debug("Commit scheduled - current pending writes: "
633             + pendingWrites);
634     }
635     commitTransaction(tm.getTransactionId());
636   }
637
638   /**
639    * Commit a transaction given its id.
640    *
641    * @param transactionId the transaction id
642    */

643   protected abstract void commitTransaction(long transactionId);
644
645   /**
646    * Notify the completion of a commit command.
647    *
648    * @param transactionId of the completed commit
649    */

650   public final void commitCompleted(long transactionId)
651   {
652     // Take care of suspended transactions
653
synchronized (transactionSync)
654     {
655       pendingTransactions--;
656
657       if (logger.isDebugEnabled())
658         logger.debug("Commit completed, remaining pending transactions: "
659             + pendingTransactions);
660
661       // If it is the last pending transaction to complete and we
662
// are waiting for pending transactions to complete, then wake
663
// up suspendNewTransactionsForCheckpoint()
664
if (suspendedTransactions && (pendingTransactions == 0))
665       {
666         synchronized (endOfCurrentTransactions)
667         {
668           endOfCurrentTransactions.notifyAll();
669         }
670       }
671     }
672     // Take care of suspended write
673
synchronized (writesSync)
674     {
675       pendingWrites--;
676
677       if (logger.isDebugEnabled())
678         logger.debug("Commit completed, remaining pending writes: "
679             + pendingWrites);
680
681       // It this is the last write to complete and writes are
682
// suspended we have to notify suspendedWrites()
683
if (suspendedWrites && (pendingWrites == 0))
684       {
685         synchronized (endOfCurrentWrites)
686         {
687           endOfCurrentWrites.notifyAll();
688         }
689       }
690     }
691   }
692
693   /**
694    * Rollback a transaction.
695    * <p>
696    * Calls the implementation specific rollbackTransaction()
697    *
698    * @param tm The transaction marker metadata
699    * @exception SQLException if an error occurs
700    * @see #rollbackTransaction(long)
701    */

702   public final void rollback(TransactionMarkerMetaData tm) throws SQLException JavaDoc
703   {
704     // Check if writes are suspended
705
synchronized (writesSync)
706     {
707       if (suspendedWrites)
708       {
709         try
710         {
711           // Wait on writesSync
712
long timeout = tm.getTimeout();
713           if (timeout > 0)
714           {
715             long start = System.currentTimeMillis();
716             writesSync.wait(timeout);
717             long end = System.currentTimeMillis();
718             long remaining = timeout - (end - start);
719             if (remaining > 0)
720               tm.setTimeout(remaining);
721             else
722             {
723               String JavaDoc msg = Translate
724                   .get("scheduler.rollback.timeout.writeSync");
725               logger.warn(msg);
726               throw new SQLException JavaDoc(msg);
727             }
728           }
729           else
730             writesSync.wait();
731         }
732         catch (InterruptedException JavaDoc e)
733         {
734           String JavaDoc msg = Translate.get("scheduler.rollback.timeout.writeSync")
735               + " (" + e + ")";
736           logger.error(msg);
737           throw new SQLException JavaDoc(msg);
738         }
739       }
740       pendingWrites++;
741
742       if (logger.isDebugEnabled())
743         logger.debug("Rollback scheduled - current pending writes: "
744             + pendingWrites);
745     }
746     rollbackTransaction(tm.getTransactionId());
747   }
748
749   /**
750    * Rollback a transaction to a savepoint.
751    * <p>
752    * Calls the implementation specific rollbackTransaction()
753    *
754    * @param tm transaction marker metadata
755    * @param savepointName name of the savepoint
756    * @throws SQLException if an error occurs
757    */

758   public final void rollback(TransactionMarkerMetaData tm, String JavaDoc savepointName)
759       throws SQLException JavaDoc
760   {
761     // Check if writes are suspended
762
synchronized (writesSync)
763     {
764       if (suspendedWrites)
765         try
766         {
767           // Wait on writesSync
768
long timeout = tm.getTimeout();
769           if (timeout > 0)
770           {
771             long start = System.currentTimeMillis();
772             writesSync.wait(timeout);
773             long end = System.currentTimeMillis();
774             long remaining = timeout - (end - start);
775             if (remaining > 0)
776               tm.setTimeout(remaining);
777             else
778             {
779               String JavaDoc msg = Translate
780                   .get("scheduler.rollbacksavepoint.timeout.writeSync");
781               logger.warn(msg);
782               throw new SQLException JavaDoc(msg);
783             }
784           }
785           else
786             writesSync.wait();
787         }
788         catch (InterruptedException JavaDoc e)
789         {
790           String JavaDoc msg = Translate
791               .get("scheduler.rollbacksavepoint.timeout.writeSync")
792               + " (" + e + ")";
793           logger.error(msg);
794           throw new SQLException JavaDoc(msg);
795         }
796       pendingWrites++;
797
798       if (logger.isDebugEnabled())
799         logger.debug("Rollback " + savepointName
800             + " scheduled - current pending writes: " + pendingWrites);
801     }
802
803     this.rollbackTransaction(tm.getTransactionId(), savepointName);
804   }
805
806   /**
807    * Rollback a transaction given its id.
808    *
809    * @param transactionId the transaction id
810    */

811   protected abstract void rollbackTransaction(long transactionId);
812
813   /**
814    * Rollback a transaction given its id to a savepoint given its name.
815    *
816    * @param transactionId the transaction id
817    * @param savepointName the name of the savepoint
818    */

819   protected abstract void rollbackTransaction(long transactionId,
820       String JavaDoc savepointName);
821
822   /**
823    * Notify the completion of a rollback command.
824    *
825    * @param transactionId of the rollback commit
826    */

827   public final void rollbackCompleted(long transactionId)
828   {
829     // Take care of suspended transactions
830
synchronized (transactionSync)
831     {
832       pendingTransactions--;
833
834       if (logger.isDebugEnabled())
835         logger.debug("Rollback completed, remaining pending transactions: "
836             + pendingTransactions);
837
838       // If it is the last pending transaction to complete and we
839
// are waiting for pending transactions to complete, then wake
840
// up suspendNewTransactionsForCheckpoint()
841
if (suspendedTransactions && (pendingTransactions == 0))
842       {
843         synchronized (endOfCurrentTransactions)
844         {
845           endOfCurrentTransactions.notifyAll();
846         }
847       }
848     }
849     // Take care of suspended write
850
synchronized (writesSync)
851     {
852       pendingWrites--;
853
854       if (logger.isDebugEnabled())
855         logger.debug("Rollback completed, remaining pending writes: "
856             + pendingWrites);
857
858       // It this is the last write to complete and writes are
859
// suspended we have to notify suspendedWrites()
860
if (suspendedWrites && (pendingWrites == 0))
861       {
862         synchronized (endOfCurrentWrites)
863         {
864           endOfCurrentWrites.notifyAll();
865         }
866       }
867     }
868   }
869
870   /**
871    * Set an unnamed savepoint.
872    * <p>
873    * Calls the implementation specific setSavepointTransaction()
874    *
875    * @param tm transaction marker metadata
876    * @return savepoint Id
877    * @throws SQLException if an error occurs
878    */

879   public final int setSavepoint(TransactionMarkerMetaData tm)
880       throws SQLException JavaDoc
881   {
882     // Check if writes are suspended
883
synchronized (writesSync)
884     {
885       if (suspendedWrites)
886         try
887         {
888           // Wait on writesSync
889
long timeout = tm.getTimeout();
890           if (timeout > 0)
891           {
892             long start = System.currentTimeMillis();
893             writesSync.wait(timeout);
894             long end = System.currentTimeMillis();
895             long remaining = timeout - (end - start);
896             if (remaining > 0)
897               tm.setTimeout(remaining);
898             else
899             {
900               String JavaDoc msg = Translate
901                   .get("scheduler.setsavepoint.timeout.writeSync");
902               logger.warn(msg);
903               throw new SQLException JavaDoc(msg);
904             }
905           }
906           else
907             writesSync.wait();
908         }
909         catch (InterruptedException JavaDoc e)
910         {
911           String JavaDoc msg = Translate
912               .get("scheduler.setsavepoint.timeout.writeSync")
913               + " (" + e + ")";
914           logger.error(msg);
915           throw new SQLException JavaDoc(msg);
916         }
917       pendingWrites++;
918
919       if (logger.isDebugEnabled())
920         logger.debug("Set savepoint scheduled - current pending writes: "
921             + pendingWrites);
922     }
923
924     int savepointId = this.incrementSavepointId();
925     this.setSavepointTransaction(tm.getTransactionId(), String
926         .valueOf(savepointId));
927     return savepointId;
928   }
929
930   /**
931    * Set a named savepoint.
932    * <p>
933    * Calls the implementation specific setSavepointTransaction()
934    *
935    * @param tm transaction marker metadata
936    * @param name name of the savepoint
937    * @throws SQLException if an error occurs
938    */

939   public final void setSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
940       throws SQLException JavaDoc
941   {
942     // Check if writes are suspended
943
synchronized (writesSync)
944     {
945       if (suspendedWrites)
946         try
947         {
948           // Wait on writesSync
949
long timeout = tm.getTimeout();
950           if (timeout > 0)
951           {
952             long start = System.currentTimeMillis();
953             writesSync.wait(timeout);
954             long end = System.currentTimeMillis();
955             long remaining = timeout - (end - start);
956             if (remaining > 0)
957               tm.setTimeout(remaining);
958             else
959             {
960               String JavaDoc msg = Translate
961                   .get("scheduler.setsavepoint.timeout.writeSync");
962               logger.warn(msg);
963               throw new SQLException JavaDoc(msg);
964             }
965           }
966           else
967             writesSync.wait();
968         }
969         catch (InterruptedException JavaDoc e)
970         {
971           String JavaDoc msg = Translate
972               .get("scheduler.setsavepoint.timeout.writeSync")
973               + " (" + e + ")";
974           logger.error(msg);
975           throw new SQLException JavaDoc(msg);
976         }
977       pendingWrites++;
978
979       if (logger.isDebugEnabled())
980         logger.debug("Set savepoint " + name
981             + " scheduled - current pending writes: " + pendingWrites);
982     }
983
984     this.setSavepointTransaction(tm.getTransactionId(), name);
985   }
986
987   /**
988    * Set a savepoint given its name to a transaction given its id.
989    *
990    * @param transactionId the transaction id
991    * @param name the name of the savepoint
992    */

993   protected abstract void setSavepointTransaction(long transactionId,
994       String JavaDoc name);
995
996   /**
997    * Release a savepoint.
998    * <p>
999    * Calls the implementation specific releaseSavepointTransaction()
1000   *
1001   * @param tm transaction marker metadata
1002   * @param name name of the savepoint
1003   * @throws SQLException if an error occurs
1004   */

1005  public final void releaseSavepoint(TransactionMarkerMetaData tm, String JavaDoc name)
1006      throws SQLException JavaDoc
1007  {
1008    // Check if writes are suspended
1009
synchronized (writesSync)
1010    {
1011      if (suspendedWrites)
1012        try
1013        {
1014          // Wait on writesSync
1015
long timeout = tm.getTimeout();
1016          if (timeout > 0)
1017          {
1018            long start = System.currentTimeMillis();
1019            writesSync.wait(timeout);
1020            long end = System.currentTimeMillis();
1021            long remaining = timeout - (end - start);
1022            if (remaining > 0)
1023              tm.setTimeout(remaining);
1024            else
1025            {
1026              String JavaDoc msg = Translate
1027                  .get("scheduler.releasesavepoint.timeout.writeSync");
1028              logger.warn(msg);
1029              throw new SQLException JavaDoc(msg);
1030            }
1031          }
1032          else
1033            writesSync.wait();
1034        }
1035        catch (InterruptedException JavaDoc e)
1036        {
1037          String JavaDoc msg = Translate
1038              .get("scheduler.releasesavepoint.timeout.writeSync")
1039              + " (" + e + ")";
1040          logger.error(msg);
1041          throw new SQLException JavaDoc(msg);
1042        }
1043      pendingWrites++;
1044
1045      if (logger.isDebugEnabled())
1046        logger.debug("Release savepoint " + name
1047            + " scheduled - current pending writes: " + pendingWrites);
1048    }
1049
1050    this.releaseSavepointTransaction(tm.getTransactionId(), name);
1051  }
1052
1053  /**
1054   * Release a savepoint given its name from a transaction given its id.
1055   *
1056   * @param transactionId the transaction id
1057   * @param name the name of the savepoint
1058   */

1059  protected abstract void releaseSavepointTransaction(long transactionId,
1060      String JavaDoc name);
1061
1062  /**
1063   * Notify the conpletion of a savepoint action.
1064   *
1065   * @param transactionId the transaction identifier
1066   */

1067  public final void savepointCompleted(long transactionId)
1068  {
1069    synchronized (writesSync)
1070    {
1071      pendingWrites--;
1072
1073      if (logger.isDebugEnabled())
1074        logger.debug("Savepoint completed, remaining pending writes: "
1075            + pendingWrites);
1076
1077      // It this is the last write to complete and writes are
1078
// suspended we have to notify suspendedWrites()
1079
if (suspendedWrites && (pendingWrites == 0))
1080      {
1081        synchronized (endOfCurrentWrites)
1082        {
1083          endOfCurrentWrites.notifyAll();
1084        }
1085      }
1086    }
1087  }
1088
1089  //
1090
// Checkpoint management
1091
//
1092

1093  /**
1094   * Suspend all calls to begin() until all current transactions are finished in
1095   * order to store a checkpoint. This method returns when all pending
1096   * transactions have finished.
1097   * <p>
1098   * New transactions remain suspended until resumeNewTransactions() is called.
1099   *
1100   * @throws SQLException if an error occurs
1101   * @see #resumeNewTransactions()
1102   */

1103  public final void suspendNewTransactionsForCheckpoint() throws SQLException JavaDoc
1104  {
1105    synchronized (transactionSync)
1106    {
1107      suspendedTransactions = true;
1108      if (pendingTransactions == 0)
1109        return;
1110    }
1111
1112    synchronized (endOfCurrentTransactions)
1113    {
1114      // Here we have a potential synchronization problem since the last
1115
// transaction completion could have happened before we entered this
1116
// synchronized block. Therefore we recheck if there is effectively
1117
// still pending transactions. If this is not the case, we don't have
1118
// to sleep and we can immediately return.
1119
if (pendingTransactions == 0)
1120        return;
1121
1122      // Wait for pending transactions to end
1123
try
1124      {
1125        endOfCurrentTransactions.wait();
1126      }
1127      catch (InterruptedException JavaDoc e)
1128      {
1129        String JavaDoc msg = Translate.get("scheduler.suspend.transaction.failed", e);
1130        logger.error(msg);
1131        throw new SQLException JavaDoc(msg);
1132      }
1133    }
1134  }
1135
1136  /**
1137   * Resume new transactions that were suspended by
1138   * suspendNewTransactionsForCheckpoint().
1139   *
1140   * @see #suspendNewTransactionsForCheckpoint()
1141   */

1142  public final void resumeNewTransactions()
1143  {
1144    synchronized (transactionSync)
1145    {
1146      suspendedTransactions = false;
1147      // Wake up all pending begin statements
1148
transactionSync.notifyAll();
1149    }
1150  }
1151
1152  /**
1153   * Suspend all write queries. This method blocks until all pending writes are
1154   * completed.
1155   * <p>
1156   * Writes execution is resumed by calling resumeWrites()
1157   *
1158   * @throws SQLException if an error occurs
1159   * @see #resumeWrites()
1160   */

1161  public void suspendWrites() throws SQLException JavaDoc
1162  {
1163    synchronized (writesSync)
1164    {
1165      suspendedWrites = true;
1166      if (pendingWrites == 0)
1167        return;
1168    }
1169
1170    synchronized (endOfCurrentWrites)
1171    {
1172      // Here we have a potential synchronization problem since the last
1173
// write completion could have happened before we entered this
1174
// synchronized block. Therefore we recheck if there is effectively
1175
// still pending writes. If this is not the case, we don't have
1176
// to sleep and we can immediately return.
1177
if (pendingWrites == 0)
1178        return;
1179
1180      // Wait for pending transactions to end
1181
try
1182      {
1183        endOfCurrentWrites.wait();
1184      }
1185      catch (InterruptedException JavaDoc e)
1186      {
1187        String JavaDoc msg = Translate.get("scheduler.suspend.writes.failed", e);
1188        logger.error(msg);
1189        throw new SQLException JavaDoc(msg);
1190      }
1191    }
1192  }
1193
1194  /**
1195   * Resume the execution of write queries that were suspended by
1196   * suspendWrites().
1197   *
1198   * @see #suspendWrites()
1199   */

1200  public void resumeWrites()
1201  {
1202    synchronized (writesSync)
1203    {
1204      suspendedWrites = false;
1205      // Wake up all waiting writes
1206
writesSync.notifyAll();
1207    }
1208  }
1209
1210  //
1211
// Debug/Monitoring
1212
//
1213

1214  protected abstract String JavaDoc getXmlImpl();
1215
1216  /**
1217   * Get information about the Request Scheduler in xml format
1218   *
1219   * @return <code>String</code> containing information in xml
1220   */

1221  public String JavaDoc getXml()
1222  {
1223    StringBuffer JavaDoc info = new StringBuffer JavaDoc();
1224    info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">");
1225    info.append(this.getXmlImpl());
1226    info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">");
1227    return info.toString();
1228  }
1229
1230  /**
1231   * Returns live information on the scheduler
1232   *
1233   * @return array of data
1234   */

1235  public String JavaDoc[] getSchedulerData()
1236  {
1237    String JavaDoc[] data = new String JavaDoc[7];
1238    data[0] = "" + numberRead;
1239    data[1] = "" + numberWrite;
1240    data[2] = "" + pendingTransactions;
1241    data[3] = "" + pendingWrites;
1242    data[4] = "" + numberRead + numberWrite;
1243    data[5] = (suspendedTransactions) ? "1" : "0";
1244    data[6] = (suspendedWrites) ? "1" : "0";
1245    return data;
1246  }
1247
1248  /**
1249   * @return Returns the numberRead.
1250   */

1251  public int getNumberRead()
1252  {
1253    return numberRead;
1254  }
1255
1256  /**
1257   * @return Returns the numberWrite.
1258   */

1259  public int getNumberWrite()
1260  {
1261    return numberWrite;
1262  }
1263
1264  /**
1265   * @return Returns the pendingTransactions.
1266   */

1267  public int getPendingTransactions()
1268  {
1269    return pendingTransactions;
1270  }
1271
1272  /**
1273   * @return Returns the suspendedTransactions.
1274   */

1275  public boolean isSuspendedTransactions()
1276  {
1277    return suspendedTransactions;
1278  }
1279
1280  /**
1281   * @return Returns the suspendedWrites.
1282   */

1283  public boolean isSuspendedWrites()
1284  {
1285    return suspendedWrites;
1286  }
1287}
Popular Tags