KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > dbunit > dataset > stream > StreamingIterator


1 /*
2  *
3  * The DbUnit Database Testing Framework
4  * Copyright (C)2002-2004, DbUnit.org
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  */

21 package org.dbunit.dataset.stream;
22
23 import org.dbunit.DatabaseUnitRuntimeException;
24 import org.dbunit.dataset.*;
25 import org.dbunit.util.concurrent.BoundedBuffer;
26 import org.dbunit.util.concurrent.Channel;
27 import org.dbunit.util.concurrent.Puttable;
28 import org.dbunit.util.concurrent.Takable;
29
30 /**
31  * @author Manuel Laflamme
32  * @since Apr 17, 2003
33  * @version $Revision: 1.3 $
34  */

35 public class StreamingIterator implements ITableIterator
36 {
37     private static final Object JavaDoc EOD = new Object JavaDoc(); // end of dataset marker
38

39     private final Takable _channel;
40     private StreamingTable _activeTable;
41     private Object JavaDoc _taken = null;
42     private boolean _eod = false;
43
44     public StreamingIterator(IDataSetProducer source) throws DataSetException
45     {
46         Channel channel = new BoundedBuffer(30);
47         _channel = channel;
48
49         AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel);
50         Thread JavaDoc thread = new Thread JavaDoc(consumer);
51         thread.setDaemon(true);
52         thread.start();
53
54         // Take first element from asyncronous handler
55
try
56         {
57             _taken = _channel.take();
58         }
59         catch (InterruptedException JavaDoc e)
60         {
61             throw new DataSetException(e);
62         }
63     }
64
65     ////////////////////////////////////////////////////////////////////////////
66
// ITableIterator interface
67

68     public boolean next() throws DataSetException
69     {
70         // End of dataset has previously been reach
71
if (_eod)
72         {
73             return false;
74         }
75
76         // Iterate to the end of current table.
77
while (_activeTable != null && _activeTable.next())
78             ;
79
80         // End of dataset is reach
81
if (_taken == EOD)
82         {
83             _eod = true;
84             _activeTable = null;
85
86 // System.out.println("End of iterator! - " + System.currentTimeMillis());
87
return false;
88         }
89
90         // New table
91
if (_taken instanceof ITableMetaData)
92         {
93             _activeTable = new StreamingTable((ITableMetaData)_taken);
94             return true;
95         }
96
97         throw new IllegalStateException JavaDoc(
98                 "Unexpected object taken from asyncronous handler: " + _taken);
99     }
100
101     public ITableMetaData getTableMetaData() throws DataSetException
102     {
103         return _activeTable.getTableMetaData();
104     }
105
106     public ITable getTable() throws DataSetException
107     {
108         return _activeTable;
109     }
110
111     ////////////////////////////////////////////////////////////////////////////
112
// StreamingTable class
113

114     private class StreamingTable extends AbstractTable
115     {
116         private ITableMetaData _metaData;
117         private int _lastRow = -1;
118         private boolean _eot = false;
119         private Object JavaDoc[] _rowValues;
120
121         public StreamingTable(ITableMetaData metaData)
122         {
123             _metaData = metaData;
124         }
125
126         boolean next() throws DataSetException
127         {
128             // End of table has previously been reach
129
if (_eot)
130             {
131                 return false;
132             }
133
134             try
135             {
136                 _taken = _channel.take();
137                 if (!(_taken instanceof Object JavaDoc[]))
138                 {
139                     _eot = true;
140                     return false;
141                 }
142
143                 _lastRow++;
144                 _rowValues = (Object JavaDoc[])_taken;
145                 return true;
146             }
147             catch (InterruptedException JavaDoc e)
148             {
149                 throw new DataSetException();
150             }
151         }
152
153         ////////////////////////////////////////////////////////////////////////
154
// ITable interface
155

156         public ITableMetaData getTableMetaData()
157         {
158             return _metaData;
159         }
160
161         public int getRowCount()
162         {
163             throw new UnsupportedOperationException JavaDoc();
164         }
165
166         public Object JavaDoc getValue(int row, String JavaDoc column) throws DataSetException
167         {
168             // Iterate up to specified row
169
while (!_eot && row > _lastRow)
170             {
171                 next();
172             }
173
174             if (row < _lastRow)
175             {
176                 throw new UnsupportedOperationException JavaDoc("Cannot go backward!");
177             }
178
179             if (_eot || row > _lastRow)
180             {
181                 throw new RowOutOfBoundsException(row + " > " + _lastRow);
182             }
183
184             return _rowValues[getColumnIndex(column)];
185         }
186
187     }
188
189     ////////////////////////////////////////////////////////////////////////////
190
// AsynchronousConsumer class
191

192     private static class AsynchronousConsumer implements Runnable JavaDoc, IDataSetConsumer
193     {
194         private final IDataSetProducer _producer;
195         private final Puttable _channel;
196
197         public AsynchronousConsumer(IDataSetProducer source, Puttable channel)
198         {
199             _producer = source;
200             _channel = channel;
201         }
202
203         ////////////////////////////////////////////////////////////////////////
204
// Runnable interface
205

206         public void run()
207         {
208             try
209             {
210                 _producer.setConsumer(this);
211                 _producer.produce();
212 // System.out.println("End of thread! - " + System.currentTimeMillis());
213
}
214             catch (DataSetException e)
215             {
216                 throw new DatabaseUnitRuntimeException(e);
217             }
218         }
219
220         ////////////////////////////////////////////////////////////////////////
221
// IDataSetConsumer interface
222

223         public void startDataSet() throws DataSetException
224         {
225         }
226
227         public void endDataSet() throws DataSetException
228         {
229             try
230             {
231                 _channel.put(EOD);
232             }
233             catch (InterruptedException JavaDoc e)
234             {
235                 throw new DataSetException();
236             }
237         }
238
239         public void startTable(ITableMetaData metaData) throws DataSetException
240         {
241             try
242             {
243                 _channel.put(metaData);
244             }
245             catch (InterruptedException JavaDoc e)
246             {
247                 throw new DataSetException();
248             }
249         }
250
251         public void endTable() throws DataSetException
252         {
253         }
254
255         public void row(Object JavaDoc[] values) throws DataSetException
256         {
257             try
258             {
259                 _channel.put(values);
260             }
261             catch (InterruptedException JavaDoc e)
262             {
263                 throw new DataSetException();
264             }
265         }
266     }
267 }
268
Popular Tags