KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > net > nutch > searcher > DistributedSearch


1 /* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
2 /* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
3
4 package net.nutch.searcher;
5
6 import java.net.InetSocketAddress JavaDoc;
7 import java.io.*;
8 import java.util.*;
9 import java.util.logging.Logger JavaDoc;
10
11 import net.nutch.parse.ParseData;
12 import net.nutch.parse.ParseText;
13 import net.nutch.util.LogFormatter;
14 import net.nutch.io.*;
15 import net.nutch.ipc.*;
16
17
18 /** Implements the search API over IPC connnections. */
19 public class DistributedSearch {
20   public static final Logger JavaDoc LOG =
21     LogFormatter.getLogger("net.nutch.searcher.DistributedSearch");
22
23   private DistributedSearch() {} // no public ctor
24

25   // op codes for IPC calls
26
private static final byte OP_SEGMENTS = (byte)0;
27   private static final byte OP_SEARCH = (byte)1;
28   private static final byte OP_EXPLAIN = (byte)2;
29   private static final byte OP_DETAILS = (byte)3;
30   private static final byte OP_SUMMARY = (byte)4;
31   private static final byte OP_CONTENT = (byte)5;
32   private static final byte OP_ANCHORS = (byte)6;
33   private static final byte OP_PARSEDATA = (byte)7;
34   private static final byte OP_PARSETEXT = (byte)8;
35   private static final byte OP_FETCHDATE = (byte)9;
36
37   /** Names of the op codes. */
38   private static final String JavaDoc[] OP_NAMES = new String JavaDoc[10];
39   static {
40     OP_NAMES[OP_SEGMENTS] = "getSegmentNames";
41     OP_NAMES[OP_SEARCH] = "search";
42     OP_NAMES[OP_EXPLAIN] = "getExplanation";
43     OP_NAMES[OP_DETAILS] = "getDetails";
44     OP_NAMES[OP_SUMMARY] = "getSummary";
45     OP_NAMES[OP_CONTENT] = "getContent";
46     OP_NAMES[OP_ANCHORS] = "getAnchors";
47     OP_NAMES[OP_PARSEDATA] = "getParseData";
48     OP_NAMES[OP_PARSETEXT] = "getParseText";
49     OP_NAMES[OP_FETCHDATE] = "getFetchDate";
50   }
51
52   /** The parameter passed with IPC requests. Public only so that {@link
53    * Server} can construct instances. */

54   public static class Param implements Writable {
55     private byte op; // the op code
56
private Writable first; // the first operand
57
private Writable second; // the second operand
58

59     public Param() {}
60
61     Param(byte op, Writable first) {
62       this(op, first, NullWritable.get());
63     }
64
65     Param(byte op, Writable first, Writable second) {
66       this.op = op;
67       this.first = first;
68       this.second = second;
69     }
70
71     public void write(DataOutput out) throws IOException {
72       out.writeByte(op);
73       first.write(out);
74       second.write(out);
75     }
76
77     public void readFields(DataInput in) throws IOException {
78       op = in.readByte();
79
80       switch (op) {
81       case OP_SEGMENTS:
82         first = NullWritable.get();
83         second = NullWritable.get();
84         break;
85       case OP_SEARCH:
86         first = new Query();
87         second = new IntWritable();
88         break;
89       case OP_EXPLAIN:
90         first = new Query();
91         second = new Hit();
92         break;
93       case OP_DETAILS:
94         first = new Hit();
95         second = NullWritable.get();
96         break;
97       case OP_SUMMARY:
98         first = new HitDetails();
99         second = new Query();
100         break;
101       case OP_CONTENT:
102       case OP_ANCHORS:
103       case OP_PARSEDATA:
104       case OP_PARSETEXT:
105       case OP_FETCHDATE:
106         first = new HitDetails();
107         second = NullWritable.get();
108         break;
109       default:
110         throw new RuntimeException JavaDoc("Unknown op code: " + op);
111       }
112
113       first.readFields(in);
114       second.readFields(in);
115     }
116   }
117
118   /** The parameter returned with IPC responses. Public only so that {@link
119    * Client} can construct instances. */

120   public static class Result implements Writable {
121     private byte op;
122     private Writable value;
123
124     public Result() {}
125
126     Result(byte op, Writable value) {
127       this.op = op;
128       this.value = value;
129     }
130
131     public void write(DataOutput out) throws IOException {
132       out.writeByte(op);
133       value.write(out);
134     }
135
136     public void readFields(DataInput in) throws IOException {
137       op = in.readByte();
138
139       switch (op) {
140       case OP_SEGMENTS:
141         value = new ArrayWritable(UTF8.class);
142         break;
143       case OP_SEARCH:
144         value = new Hits();
145         break;
146       case OP_EXPLAIN:
147         value = new UTF8();
148         break;
149       case OP_DETAILS:
150         value = new HitDetails();
151         break;
152       case OP_SUMMARY:
153         value = new UTF8();
154         break;
155       case OP_CONTENT:
156         value = new BytesWritable();
157         break;
158       case OP_ANCHORS:
159         value = new ArrayWritable(UTF8.class);
160         break;
161       case OP_PARSEDATA:
162         value = new ParseData();
163         break;
164       case OP_PARSETEXT:
165         value = new ParseText();
166         break;
167       case OP_FETCHDATE:
168         value = new LongWritable();
169         break;
170       default:
171         throw new RuntimeException JavaDoc("Unknown op code: " + op);
172       }
173
174       value.readFields(in);
175     }
176   }
177
178   /** The search server. */
179   public static class Server extends net.nutch.ipc.Server {
180     private NutchBean bean;
181
182     /** Construct a search server on the index and segments in the named
183      * directory, listening on the named port. */

184     public Server(File directory, int port) throws IOException {
185       super(port, Param.class, 10);
186       this.bean = new NutchBean(directory);
187     }
188
189     public Writable call(Writable param) throws IOException {
190       Param p = (Param)param;
191       logRequest(p);
192       Writable value;
193       switch (p.op) {
194       case OP_SEGMENTS:
195         value = new ArrayWritable(bean.getSegmentNames());
196         break;
197       case OP_SEARCH:
198         value = bean.search((Query)p.first, ((IntWritable)p.second).get());
199         break;
200       case OP_EXPLAIN:
201         value = new UTF8(bean.getExplanation((Query)p.first, (Hit)p.second));
202         break;
203       case OP_DETAILS:
204         value = bean.getDetails((Hit)p.first);
205         break;
206       case OP_SUMMARY:
207         value = new UTF8(bean.getSummary((HitDetails)p.first,(Query)p.second));
208         break;
209       case OP_CONTENT:
210         value = new BytesWritable(bean.getContent((HitDetails)p.first));
211         break;
212       case OP_ANCHORS:
213         value = new ArrayWritable(bean.getAnchors((HitDetails)p.first));
214         break;
215       case OP_PARSEDATA:
216         value = bean.getParseData((HitDetails)p.first);
217         break;
218       case OP_PARSETEXT:
219         value = bean.getParseText((HitDetails)p.first);
220         break;
221       case OP_FETCHDATE:
222         value = new LongWritable(bean.getFetchDate((HitDetails)p.first));
223         break;
224       default:
225         throw new RuntimeException JavaDoc("Unknown op code: " + p.op);
226       }
227       
228       //LOG.info("Result: "+value);
229

230       return new Result(p.op, value);
231
232     }
233
234     private static void logRequest(Param p) {
235       StringBuffer JavaDoc buffer = new StringBuffer JavaDoc();
236       buffer.append(Thread.currentThread().getName());
237       buffer.append(": ");
238       buffer.append(OP_NAMES[p.op]);
239       buffer.append("(");
240       if (p.first != NullWritable.get()) {
241         buffer.append(p.first);
242         if (p.second != NullWritable.get()) {
243           buffer.append(", ");
244           buffer.append(p.second);
245         }
246       }
247       buffer.append(")");
248       LOG.info(buffer.toString());
249     }
250
251     /** Runs a search server. */
252     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
253       String JavaDoc usage = "DistributedSearch$Server <port> <index dir>";
254
255       if (args.length == 0 || args.length > 2) {
256         System.err.println(usage);
257         System.exit(-1);
258       }
259
260       int port = Integer.parseInt(args[0]);
261       File directory = new File(args[1]);
262
263       Server server = new Server(directory, port);
264       //server.setTimeout(Integer.MAX_VALUE);
265
server.start();
266       server.join();
267     }
268
269   }
270
271   /** The search client. */
272   public static class Client extends net.nutch.ipc.Client
273     implements Searcher, HitDetailer, HitSummarizer, HitContent {
274
275     private InetSocketAddress JavaDoc[] addresses;
276     private HashMap segmentToAddress = new HashMap();
277
278     /** Construct a client talking to servers listed in the named file.
279      * Each line in the file lists a server hostname and port, separated by
280      * whitespace.
281      */

282
283     public Client(File file) throws IOException {
284       this(readConfig(file));
285     }
286
287     private static InetSocketAddress JavaDoc[] readConfig(File config)
288       throws IOException {
289       BufferedReader reader = new BufferedReader(new FileReader(config));
290       ArrayList addrs = new ArrayList();
291       String JavaDoc line;
292       while ((line = reader.readLine()) != null) {
293         StringTokenizer tokens = new StringTokenizer(line);
294         if (tokens.hasMoreTokens()) {
295           String JavaDoc host = tokens.nextToken();
296           if (tokens.hasMoreTokens()) {
297             String JavaDoc port = tokens.nextToken();
298             addrs.add(new InetSocketAddress JavaDoc(host, Integer.parseInt(port)));
299             LOG.info("Client adding server " + host + ":" + port);
300           }
301         }
302       }
303       return (InetSocketAddress JavaDoc[])
304         addrs.toArray(new InetSocketAddress JavaDoc[addrs.size()]);
305     }
306
307     /** Construct a client talking to the named servers. */
308     public Client(InetSocketAddress JavaDoc[] addresses) throws IOException {
309       super(Result.class);
310       
311       this.addresses = addresses;
312
313       // build segmentToAddress map
314
Param param = new Param(OP_SEGMENTS, NullWritable.get());
315       Writable[] params = new Writable[addresses.length];
316       for (int i = 0; i < params.length; i++) {
317         params[i] = param; // build param for parallel call
318
}
319       Writable[] results = call(params, addresses); // make parallel call
320

321       for (int i = 0; i < results.length; i++) { // process results of call
322
Result result = (Result)results[i];
323         if (result == null) {
324           LOG.warning("Client: no segments from: " + addresses[i]);
325           continue;
326         }
327         String JavaDoc[] segments = ((ArrayWritable)result.value).toStrings();
328         for (int j = 0; j < segments.length; j++) {
329           LOG.info("Client: segment "+segments[j]+" at "+addresses[i]);
330           segmentToAddress.put(segments[j], addresses[i]);
331         }
332       }
333     }
334
335     /** Return the names of segments searched. */
336     public String JavaDoc[] getSegmentNames() {
337       return (String JavaDoc[])segmentToAddress.keySet().toArray(new String JavaDoc[segmentToAddress.size()]);
338     }
339
340     public Hits search(Query query, int numHits) throws IOException {
341       long totalHits = 0;
342       Hits[] segmentHits = new Hits[addresses.length];
343
344       Param param = new Param(OP_SEARCH, query, new IntWritable(numHits));
345       Writable[] params = new Writable[addresses.length];
346       for (int i = 0; i < params.length; i++) {
347         params[i] = param; // build param for parallel call
348
}
349       Writable[] results = call(params, addresses); // make parallel call
350

351       TreeSet queue = new TreeSet(); // cull top hits from results
352
float minScore = 0.0f;
353       for (int i = 0; i < results.length; i++) {
354         Result result = (Result)results[i];
355         if (result == null) continue;
356         Hits hits = (Hits)result.value;
357         totalHits += hits.getTotal();
358         for (int j = 0; j < hits.getLength(); j++) {
359           Hit h = hits.getHit(j);
360           if (h.getScore() >= minScore) {
361             queue.add(new Hit(i, h.getIndexDocNo(),h.getScore(),h.getSite()));
362             if (queue.size() > numHits) { // if hit queue overfull
363
queue.remove(queue.last()); // remove lowest in hit queue
364
minScore = ((Hit)queue.last()).getScore(); // reset minScore
365
}
366           }
367         }
368       }
369       return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()]));
370     }
371     
372     public String JavaDoc getExplanation(Query query, Hit hit) throws IOException {
373       Param param = new Param(OP_EXPLAIN, query, hit);
374       Result result = (Result)call(param, addresses[hit.getIndexNo()]);
375       return result.value.toString();
376     }
377     
378     public HitDetails getDetails(Hit hit) throws IOException {
379       Param param = new Param(OP_DETAILS, hit);
380       Result result = (Result)call(param, addresses[hit.getIndexNo()]);
381       return (HitDetails)result.value;
382     }
383     
384     public HitDetails[] getDetails(Hit[] hits) throws IOException {
385       Writable[] params = new Writable[hits.length];
386       InetSocketAddress JavaDoc[] addrs = new InetSocketAddress JavaDoc[hits.length];
387       for (int i = 0; i < hits.length; i++) {
388         params[i] = new Param(OP_DETAILS, hits[i]);
389         addrs[i] = addresses[hits[i].getIndexNo()];
390       }
391       Writable[] writables = call(params, addrs);
392       HitDetails[] results = new HitDetails[writables.length];
393       for (int i = 0; i < results.length; i++) {
394         results[i] = (HitDetails)((Result)writables[i]).value;
395       }
396       return results;
397     }
398
399
400     public String JavaDoc getSummary(HitDetails hit, Query query) throws IOException {
401       Param param = new Param(OP_SUMMARY, hit, query);
402       InetSocketAddress JavaDoc address =
403         (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
404       Result result = (Result)call(param, address);
405       return result.value.toString();
406     }
407
408     public String JavaDoc[] getSummary(HitDetails[] hits, Query query)
409       throws IOException {
410       Writable[] params = new Writable[hits.length];
411       InetSocketAddress JavaDoc[] addrs = new InetSocketAddress JavaDoc[hits.length];
412       for (int i = 0; i < hits.length; i++) {
413         HitDetails hit = hits[i];
414         params[i] = new Param(OP_SUMMARY, hit, query);
415         addrs[i] =
416           (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
417       }
418       Writable[] results = call(params, addrs);
419       String JavaDoc[] strings = new String JavaDoc[results.length];
420       for (int i = 0; i < results.length; i++) {
421         if (results[i] != null)
422           strings[i] = ((Result)results[i]).value.toString();
423       }
424       return strings;
425     }
426     
427     public byte[] getContent(HitDetails hit) throws IOException {
428       Param param = new Param(OP_CONTENT, hit);
429       InetSocketAddress JavaDoc address =
430         (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
431       Result result = (Result)call(param, address);
432       return ((BytesWritable)result.value).get();
433     }
434     
435     public ParseData getParseData(HitDetails hit) throws IOException {
436         Param param = new Param(OP_PARSEDATA, hit);
437         InetSocketAddress JavaDoc address =
438           (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
439         Result result = (Result)call(param, address);
440         return (ParseData)result.value;
441       }
442       
443     public ParseText getParseText(HitDetails hit) throws IOException {
444         Param param = new Param(OP_PARSETEXT, hit);
445         InetSocketAddress JavaDoc address =
446           (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
447         Result result = (Result)call(param, address);
448         return (ParseText)result.value;
449     }
450       
451     public String JavaDoc[] getAnchors(HitDetails hit) throws IOException {
452       Param param = new Param(OP_ANCHORS, hit);
453       InetSocketAddress JavaDoc address =
454         (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
455       Result result = (Result)call(param, address);
456       return ((ArrayWritable)result.value).toStrings();
457     }
458
459     public long getFetchDate(HitDetails hit) throws IOException {
460         Param param = new Param(OP_FETCHDATE, hit);
461         InetSocketAddress JavaDoc address =
462           (InetSocketAddress JavaDoc)segmentToAddress.get(hit.getValue("segment"));
463         Result result = (Result)call(param, address);
464         return ((LongWritable)result.value).get();
465     }
466       
467     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
468       String JavaDoc usage = "DistributedSearch$Client query <host> <port> ...";
469
470       if (args.length == 0) {
471         System.err.println(usage);
472         System.exit(-1);
473       }
474
475       Query query = Query.parse(args[0]);
476       
477       InetSocketAddress JavaDoc[] addresses = new InetSocketAddress JavaDoc[(args.length-1)/2];
478       for (int i = 0; i < (args.length-1)/2; i++) {
479         addresses[i] =
480           new InetSocketAddress JavaDoc(args[i*2+1], Integer.parseInt(args[i*2+2]));
481       }
482
483       Client client = new Client(addresses);
484       //client.setTimeout(Integer.MAX_VALUE);
485

486       Hits hits = client.search(query, 10);
487       System.out.println("Total hits: " + hits.getTotal());
488       for (int i = 0; i < hits.getLength(); i++) {
489         System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i)));
490       }
491
492     }
493
494
495   }
496
497 }
498
Popular Tags