public interface Pusher {  /**   * Status indicating the readiness of the Pusher.   */  public static enum PusherStatus {    OK, LOW_MEMORY, LOCAL_FEED_BACKLOG, GSA_FEED_BACKLOG, DISABLED;  }  /**   * Takes an spi Document and pushes it along, presumably to the GSA Feed.   *   * @param document A Document   * @return PusherStatus. If OK, Pusher may accept more documents.   * @throws RepositoryException if transient error accessing the Repository   * @throws RepositoryDocumentException if fatal error accessing the Document   * @throws FeedException if a transient Feed error occurs in the Pusher   * @throws PushException if a transient error occurs in the Pusher   */  public PusherStatus take(Document document)      throws PushException, FeedException, RepositoryException;  /**   * Finishes processing a document feed.  If the caller anticipates no   * further calls to {
@link #take(Document)} will be * made, this method should be called, so that the Pusher may send a cached, * accumulated Feed to the feed processor. * * @throws RepositoryException if transient error accessing the Repository * @throws RepositoryDocumentException if fatal error accessing the Document * @throws FeedException if a transient Feed error occurs in the Pusher * @throws PushException if a transient error occurs in the Pusher */ public void flush() throws PushException, FeedException, RepositoryException; /** * Cancels a feed. Discard any accumulated feed data. */ public void cancel(); /** * Gets the current pusher status. * * @return the current PusherStatus * @throws RepositoryException if transient error accessing the Repository * @throws FeedException if a transient Feed error occurs in the Pusher * @throws PushException if a transient error occurs in the Pusher */ public PusherStatus getPusherStatus() throws PushException, FeedException, RepositoryException;}

该接口的通过PusherStatus take(Document document)方法发送数据,我们可以查看其实现类DocPusher查看发送数据方法源码

/**   * Takes a Document and sends a the feed to the GSA.   *   * @param document Document corresponding to the document.   * @return true if Pusher should accept more documents, false otherwise.   * @throws PushException if Pusher problem   * @throws FeedException if transient Feed problem   * @throws RepositoryDocumentException if fatal Document problem   * @throws RepositoryException if transient Repository problem   */  @Override  public PusherStatus take(Document document)      throws PushException, FeedException, RepositoryException {    if (feedSender.isShutdown()) {      return PusherStatus.DISABLED;    }    checkSubmissions();    // Apply any configured Document filters to the document.    document = documentFilterFactory.newDocumentFilter(document);    FeedType feedType;    try {      feedType = DocUtils.getFeedType(document);    } catch (RuntimeException e) {      LOGGER.log(Level.WARNING,          "Rethrowing RuntimeException as RepositoryDocumentException", e);      throw new RepositoryDocumentException(e);    }    // All feeds in a feed file must be of the same type.    // If the feed would change type, send the feed off to the GSA    // and start a new one.    // TODO: Fix this check to allow ACLs in any type feed.    if (xmlFeed != null && !feedType.isCompatible(xmlFeed.getFeedType())) {      if (LOGGER.isLoggable(Level.FINE)) {        LOGGER.fine("A new feedType, " + feedType + ", requires a new feed for "            + connectorName + ". Closing feed and sending to GSA.");      }      submitFeed();    }    if (xmlFeed == null) {      if (LOGGER.isLoggable(Level.FINE)) {        LOGGER.fine("Creating new " + feedType + " feed for " + connectorName);      }      try {        startNewFeed(feedType);      } catch (OutOfMemoryError me) {        throw new PushException("Unable to allocate feed buffer.  Try reducing"            + " the maxFeedSize setting, reducing the number of connector"            + " intances, or adjusting the JVM heap size parameters.", me);      }    }    boolean isThrowing = false;    int resetPoint = xmlFeed.size();    int resetCount = xmlFeed.getRecordCount();    try {      if (LOGGER.isLoggable(Level.FINER)) {        LOGGER.log(Level.FINER, "DOCUMENT: Adding document with docid={0} and "            + "searchurl={1} from connector {2} to feed.", new Object[] {            DocUtils.getOptionalString(document, SpiConstants.PROPNAME_DOCID),            DocUtils.getOptionalString(document,              SpiConstants.PROPNAME_SEARCHURL),            connectorName});      }      // Add this document to the feed.      xmlFeed.addRecord(document);      // If the feed is full, send it off to the GSA.      if (xmlFeed.isFull() || lowMemory()) {        if (LOGGER.isLoggable(Level.FINE)) {          LOGGER.fine("Feed for " + connectorName + " has grown to "              + xmlFeed.size() + " bytes. Closing feed and sending to GSA.");        }        submitFeed();        return getPusherStatus();      }      // Indicate that this Pusher may accept more documents.      return PusherStatus.OK;    } catch (OutOfMemoryError me) {      resetFeed(resetPoint, resetCount);      throw new PushException("Out of memory building feed, retrying.", me);    } catch (RuntimeException e) {      resetFeed(resetPoint, resetCount);      LOGGER.log(Level.WARNING,          "Rethrowing RuntimeException as RepositoryDocumentException", e);      throw new RepositoryDocumentException(e);    } catch (RepositoryDocumentException rde) {      // Skipping this document, remove it from the feed.      resetFeed(resetPoint, resetCount);      throw rde;    } catch (IOException ioe) {      LOGGER.log(Level.SEVERE, "IOException while reading: skipping", ioe);      resetFeed(resetPoint, resetCount);      Throwable t = ioe.getCause();      isThrowing = true;      if (t != null && (t instanceof RepositoryException)) {        throw (RepositoryException) t;      } else {        throw new RepositoryDocumentException("I/O error reading data", ioe);      }    }  }

在上面的方法中,首先需要将参数Document documen对象经过包装(如Base64编码等)添加到xmlFeed集合中,当xmlFeed集合满足条件的时候才向数据服务器发送过去,即每次向数据服务器发送的是document对象集合,而不是单独的document对象


/**     * Takes the accumulated XmlFeed and sends the feed to the GSA.     *      * @throws PushException     *             if Pusher problem     * @throws FeedException     *             if transient Feed problem     * @throws RepositoryException     */    private void submitFeed() throws PushException, FeedException,            RepositoryException {        if (xmlFeed == null) {            return;        }        final XmlFeed feed = xmlFeed;        xmlFeed = null;        final String logMessage;        if (feedLog != null) {            logMessage = feedLog.toString();            feedLog = null;        } else {            logMessage = null;        }        try {            feed.close();        } catch (IOException ioe) {            throw new PushException("Error closing feed", ioe);        }        try {            // Send the feed to the GSA in a separate thread.            FutureTask
future = new FutureTask
( new Callable
() { public String call() throws PushException, FeedException, RepositoryException { try { NDC.push("Feed " + feed.getDataSource()); return submitFeed(feed, logMessage); } finally { NDC.remove(); } } }); feedSender.execute(future); // Add the future to list of outstanding submissions. synchronized (submissions) { submissions.add(future); } } catch (RejectedExecutionException ree) { throw new FeedException("Asynchronous feed was rejected. ", ree); } }

该方法首先将数据发送方法封装到FutureTask对象的call()方法里面,然后在线程池里面执行之,最后将future结构句柄添加到LinkedList<FutureTask<String>> submissions集合


/**     * Takes the supplied XmlFeed and sends that feed to the GSA.     *      * @param feed     *            an XmlFeed     * @param logMessage     *            a Feed Log message     * @return response String from GSA     * @throws PushException     *             if Pusher problem     * @throws FeedException     *             if transient Feed problem     * @throws RepositoryException     */    private String submitFeed(XmlFeed feed, String logMessage)            throws PushException, FeedException, RepositoryException {                if (LOGGER.isLoggable(Level.FINE)) {            LOGGER.fine("Submitting " + feed.getFeedType() + " feed for "                    + feed.getDataSource() + " to the GSA. "                    + feed.getRecordCount() + " records totaling "                    + feed.size() + " bytes.");        }        // Write the generated feedLog message to the feed logger.        if (logMessage != null && FEED_LOGGER.isLoggable(FEED_LOG_LEVEL)) {            FEED_LOGGER.log(FEED_LOG_LEVEL, logMessage);        }        //将xmlfeed写入临时文件        // Write the Feed to the TeedFeedFile, if one was specified.        String teedFeedFilename = Context.getInstance().getTeedFeedFile();        // String teedFeedFilename = "D:/files/google2.txt";        if (teedFeedFilename != null) {            boolean isThrowing = false;            OutputStream os = null;            try {                os = new FileOutputStream(teedFeedFilename, true);                feed.writeTo(os);            } catch (IOException e) {                isThrowing = true;                throw new FeedException("Cannot write to file: "                        + teedFeedFilename, e);            } finally {                if (os != null) {                    try {                        os.close();                    } catch (IOException e) {                        if (!isThrowing) {                            throw new FeedException("Cannot write to file: "                                    + teedFeedFilename, e);                        }                    }                }            }        }        String gsaResponse = feedConnection.sendData(feed);        if (!gsaResponse.equals(GsaFeedConnection.SUCCESS_RESPONSE)) {            String eMessage = gsaResponse;            if (GsaFeedConnection.UNAUTHORIZED_RESPONSE.equals(gsaResponse)) {                eMessage += ": Client is not authorized to send feeds. Make "                        + "sure the GSA is configured to trust feeds from your host.";            }            if (GsaFeedConnection.INTERNAL_ERROR_RESPONSE.equals(gsaResponse)) {                eMessage += ": Check GSA status or feed format.";            }            throw new PushException(eMessage);        }        return gsaResponse;    }



/* @Override */    public String sendData(FeedData feedData) throws FeedException {        try {            String response = sendFeedData((XmlFeed) feedData);            gotFeedError = !response.equalsIgnoreCase(SUCCESS_RESPONSE);            return response;        } catch (FeedException fe) {            gotFeedError = true;            throw fe;        }    }

进一步调用sendFeedData(XmlFeed feed)方法发送

private String sendFeedData(XmlFeed feed) throws FeedException {        String feedType = feed.getFeedType().toLegacyString();        String dataSource = feed.getDataSource();        OutputStream outputStream;        HttpURLConnection uc;        StringBuilder buf = new StringBuilder();        byte[] prefix;        byte[] suffix;        try {            // Build prefix.            controlHeader(buf, "datasource", ServletUtil.MIMETYPE_TEXT_PLAIN);            buf.append(dataSource).append(CRLF);            controlHeader(buf, "feedtype", ServletUtil.MIMETYPE_TEXT_PLAIN);            buf.append(feedType).append(CRLF);            controlHeader(buf, "data", ServletUtil.MIMETYPE_XML);            prefix = buf.toString().getBytes("UTF-8");            // Build suffix.            buf.setLength(0);            buf.append(CRLF).append("--").append(BOUNDARY).append("--")                    .append(CRLF);            suffix = buf.toString().getBytes("UTF-8");            LOGGER.finest("Opening feed connection to " + feedUrl);                        synchronized (this) {                uc = (HttpURLConnection) feedUrl.openConnection();            }            if (uc instanceof HttpsURLConnection && !validateCertificate) {                SslUtil.setTrustingHttpsOptions((HttpsURLConnection) uc);            }                      uc.setRequestProperty("Charsert", "UTF-8");            uc.setDoInput(true);            uc.setDoOutput(true);            uc.setFixedLengthStreamingMode(prefix.length + feed.size()                    + suffix.length);                        uc.setRequestProperty("Content-Type",                    "multipart/form-data; boundary=" + BOUNDARY);                        outputStream = uc.getOutputStream();        } catch (IOException ioe) {            throw new FeedException(feedUrl.toString(), ioe);        } catch (GeneralSecurityException e) {            throw new FeedException(feedUrl.toString(), e);        }        boolean isThrowing = false;        buf.setLength(0);        try {            LOGGER.finest("Writing feed data to feed connection.");            // If there is an exception during this read/write, we do our            // best to close the url connection and read the result.            try {                outputStream.write(prefix);                feed.writeTo(outputStream);                outputStream.write(suffix);                outputStream.flush();            } catch (IOException e) {                LOGGER.log(Level.SEVERE,                        "IOException while posting: will retry later", e);                isThrowing = true;                throw new FeedException(e);            } catch (RuntimeException e) {                isThrowing = true;                throw e;            } catch (Error e) {                isThrowing = true;                throw e;            } finally {                try {                    outputStream.close();                } catch (IOException e) {                    LOGGER.log(                            Level.SEVERE,                            "IOException while closing after post: will retry later",                            e);                    if (!isThrowing) {                        isThrowing = true;                        throw new FeedException(e);                    }                }            }        } finally {            BufferedReader br = null;            try {                LOGGER.finest("Waiting for response from feed connection.");                InputStream inputStream = uc.getInputStream();                br = new BufferedReader(new InputStreamReader(inputStream,                        "UTF8"));                String line;                while ((line = br.readLine()) != null) {                    buf.append(line);                }            } catch (IOException ioe) {                if (!isThrowing) {                    throw new FeedException(ioe);                }            } finally {                try {                    if (br != null) {                        br.close();                    }                } catch (IOException e) {                    LOGGER.log(Level.SEVERE,                            "IOException while closing after post: continuing",                            e);                }                if (uc != null) {                    uc.disconnect();                }                if (LOGGER.isLoggable(Level.FINEST)) {                    LOGGER.finest("Received response from feed connection: "                            + buf.toString());                }            }        }        return buf.toString();    }




转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)


