Using Hadoop to Create SOLR Indexes

One of the most challenging projects I faced at work recently was to create a Apache SOLR index consisting of approx 15 million records. This index had been created once in the history of the company using a MySQL database and SOLR’s Data Import Handler (DIH). It had not been attempted since then because the original indexing process was time consuming (12-14 hours), required human supervision, and on failure had to be restarted from the very beginning.

For small data sets (say, less than 100,000 records) SOLR’s DIH and MySQL works fine. However, for these larger sets it’s just too much of a drain on resources.  Some members of our team and the architecture team had had success working with large data sets by leveraging the Apache Hadoop project. One of the most attractive aspect of Hadoop is that the processing is distributed which should reduce the total time to index. Also Hadoop has a robust fail-over system which would remove the need for human supervision. We architected a data pipeline by which data would be processed by modules. When one module completed its task it would alert the system and the next module would begin work on the the output of the previous module. The SOLR indexing is one module.

Many of the modules, including the SOLR indexing module, use the Hadoop Map/Reduce programming model to process the data over a distributed system. Having never used SOLR in this type of environment I had questions:

  • Can SOLR access the Hadoop file system?
  • Can SOLR indexing be applied in a distributed fashion?
  • Can we control where SOLR would write its indexes?

Can SOLR access the Hadoop file system?

It turns out that in its current state SOLR must read/write to a local file system.  There may be a way around this but we were fighting against time to prove that the pipeline is a viable (and cost effective) system. As such, there would have to be a mix of local file system and HDFS.

Can SOLR indexing be applied in a distributed fashion?

The mapping can but not the reducing (at least not in this first version).  As the mapper is fed data those data are used to generate an index. The indexes generated by the mappers are a subset of the overall data set. During the cleanup process all of the smaller indexes generated by the mapper are copied to the HDFS.

Since SOLR has to read/write to a local file system we could not distribute the reducer part of the process. The number of reducers was limited to 1 (using the setNumReduceTasks method in org.apache.hadoop.mapreduce.Job).  The reducer simply copied all the smaller indexes from the HDFS to a node’s local file system. On cleanup the smaller indexes were merged into one large index that would be the final product (using org.apache.lucene.index.IndexWriter).

Challenges

The first time we ran the indexer job we found that Hadoop would not allow the reducer to complete. Repeatedly Hadoop would start a task and kill it after exactly 10 minutes.  If the Hadoop sees a task being idle for a certain amount of time it assumes something is wrong, kills the task, and starts another. The code did send Hadoop progress updates (using Context.progress()). However, the merge process performed by the IndexWriter takes longer than the configured time out (the default value for mapred.task.timeout is 600000 milliseconds). There is no way to put the merge process on hold to give Hadoop a status update. Once IndexWriter has been told to merge it goes off and returns only when it is done. The short term solution to this was to increase the time-out from the default 600000 milliseconds to 2 hours.

Another issue had to do with being in a distributed environment. Since there could potentially be more than one mapper running on a given node we had to make sure that the directory to which the mapper writes its index has a unique name otherwise index files generated by one mapper could get overwritten by another. I decided to use the id of the map task as the unique name of the directory (the value of mapred.task.id obtained from the Configuration) but how to tell SOLR to use this value for the index directory name?

Using the debugger I traced the process of how SOLR writes its index files. I found that SOLR uses the updateHandler class as defined by the solrconfig.xml to instantiate a SolrIndexWriter class which writes the indexes. The second argument to SolrIndexWriter is the path to the directory in which the index will be written.  The update handler, by default org.apache.solr.update.DirectUpdateHandler2, defines a createMainIndexWriter(final String name, final boolean removeAllExisting) method in which the SolrIndexWriter is instantiated. By extending DirectUpdateHandler2 and overriding the createMainIndexWriter method I am able to control where the indexes are written.

The trickier part is how to get the map task id value down into the createMainIndexWriter method. The only object I found that flows from the higher level where the value of the map task id is available down the the update handler is the SolrInputDocument. This means that:

  1. A field in the core’s schema.xml has to be defined: <field name="map_task_id"
    type="text" indexed="false" stored="false" required="false" multiValued="false" />
  2. When the SOLR document is generated in the mapper the value of the map task id must be added to the document
  3. After the update handler has fetched the value of the map task id field from the document
    the field can be removed from the document (unless you want that value to appear in the final index).

While in the update handler where do you get the SolrInputDocument? Good question. There is another method in org.apache.solr.update.DirectUpdateHandler2 that needs to be overridden: public int addDoc(final AddUpdateCommand cmd) The AddUpdateCommand object contains the SolrInputDocument which contains the value of the map_task_id field. In the overridden addDoc method assign the AddUpdateCommand received to a member variable (making sure to call super(cmd) in case it is used elsewhere).

 
  /**
   * The cmd contains the SolrInputDocument which will/should
   * have the map task id as one of its keys. It is this value
   * that is used to name the index directory.
   *
   * @param cmd
   *          the command object that contains the SolrInputDocument
   * @return the parent class' code does not document the return value
   * @throws IOException
   */
  @Override
  public int addDoc(final AddUpdateCommand cmd) throws IOException {
    this.cmd = cmd;
    return super.addDoc(cmd);
  }

Finally in the createMainIndexWriter method get the SolrInputDocument from cmd.getSolrInputDocument() and use the value of the map_task_id field to create the directory name.

  /**
   * This method generates the new writer that will actually write the data to
   * the directory.
   *
   * @param name
   *          the name of the writer; in the parent it is the name of the class
   * @param removeAllExisting
   *          true means to create/overwrite existing; false means append
   * @return the writer
   * @throws IOException
   */
  @Override
  protected SolrIndexWriter createMainIndexWriter(final String name, final boolean removeAllExisting)
      throws IOException {
    final String indexDir = getNewIndexDir();
    final DirectoryFactory directoryFactory = getDirectoryFactory();
    final SolrIndexConfig mainIndexConfig = getMainIndexConfig();
    final IndexDeletionPolicyWrapper deletionPolicy = getDeletionPolicy();

    return new SolrIndexWriter("HadoopUpdateHandler", indexDir,
        directoryFactory, removeAllExisting, schema, mainIndexConfig,
        deletionPolicy);
  }

  /**
   * Generate the path to the index directory
   *
   * @return
   */
  private String getNewIndexDir() {
    String indexDirName = getDataDir() + getMapTaskIdDirectoryName();

    final File file = new File(indexDirName);
    if(!file.exists()) {
      indexDirName = file.getAbsolutePath();
      if(!createEmptyIndexDirectory(indexDirName)) {
        indexDirName = null;
      }
    }
    return indexDirName;
  }

  /**
   * From the cmd passed into addDoc fetch the SolrInputDocument. That
   * document should have the map task id. Grab that value then remove it from
   * the document (no need to index it).
   *
   * @return the name of the index directory ("index" is default value if the
   *         map task id value cannot be obtained from the solr input document.
   */
  private String getMapTaskIdDirectoryName() {
    String directory = "index";
    if(cmd != null) {
      final SolrInputDocument doc = cmd.getSolrInputDocument();
      if((doc != null) && doc.containsKey(SolrConstants.FIELD_MAP_TASK_ID)
          && (doc.get(SolrConstants.FIELD_MAP_TASK_ID) != null)) {
        directory = doc.get(SolrConstants.FIELD_MAP_TASK_ID).getFirstValue().toString();
        doc.removeField(SolrConstants.FIELD_MAP_TASK_ID);
      }
    }
    return directory;
  }

48 thoughts on “Using Hadoop to Create SOLR Indexes

  1. Hi,

    Great and Thanks for sharing. I have doubt –

    You have said – “..During the cleanup process all of the smaller indexes generated by the mapper are copied to the HDFS….” – I suppose the mapper’s, context.write will write the output to hdfs only. So why would you want to do this?

    Also to aggregate the small indexes, can’t we develop another MapReduce job, which will read the small indexes created by the previous mappers and build a big index file?. This will reduce stress on the clusters as Map jobs just emit the records without any processing.

    If not confidential, can we have a look at your code?

    Let me know. Thanks.

    • There are 3 problems. The first is that SOLR can only read from/write to a local disk system, not HDFS. Because of this we have to “manually” move the indexes from local disk to HDFS (mapper cleanup) and back to local again (reducer).

      The second problem is that the index writing is not designed to be distributed. SOLR cannot write 1 index to multiple places (I don’t think it can be done in multiple threads either).

      The third is that the merging process, in addition to the above, is atomic. By this I mean that once it starts it cannot be interrupted without corrupting the index.

      Unfortunately, I’ve provided all the code I can (and it’s been watered down to remove confidential information).

      If I’ve misunderstood your question or if you know a way to distribute the creation of SOLR indexes I would be grateful to hear more.

  2. Hi, I’m not getting the following functions,

    final DirectoryFactory directoryFactory = getDirectoryFactory();
    final SolrIndexConfig mainIndexConfig = getMainIndexConfig();
    final IndexDeletionPolicyWrapper deletionPolicy = getDeletionPolicy();

    Moreover, what should be SolrConstants.FIELD_MAP_TASK_ID?

    Do I need to be in a particular package or something?

    Please help!

    • Thanks for your questions, JTheRocker.

      You’ll have to create those methods. All of them are convenience methods that depend on

          protected final SolrCore core;
      

      found in org.apache.solr.update.UpdateHandler being protected. We extend org.apache.solr.update.DirectUpdateHandler2 (which extends org.apache.solr.update.UpdateHandler).

      Here are examples:

          /**
           * core is protected in org.apache.solr.update.UpdateHandler from which your class will inherit.
           */
          private SolrCore getSolrCore() {
              return core;
          }
      
          /**
           * Fetch the directory factory from the SOLR core.
           */
          private DirectoryFactory getDirectoryFactory() {
              return getSolrCore().getDirectoryFactory();
          }
      
          /**
           * Fetch the main index configuration from the SOLR configuration (from the
           * SOLR core).
           */
          private SolrIndexConfig getMainIndexConfig() {
              return getSolrCore().getSolrConfig().mainIndexConfig;
          }
      
          /**
           * Fetch the index deletion policy from the SOLR core.
           */
          private IndexDeletionPolicyWrapper getDeletionPolicy() {
              return getSolrCore().getDeletionPolicy();
          }
      

      Regarding SolrConstants.FIELD_MAP_TASK_ID we just created our own class of common constants for use in our application. In our SolrContants class we have:

          public static final String FIELD_MAP_TASK_ID = "map_task_id";
      

      It is just a value we use in the class that extends org.apache.solr.update.DirectUpdateHandler2.

      I hope this helps.

  3. Hi Dan,

    Felt great that you replied. I would also like to know how will I use the class after extending the DirectUpdateHandler2? I mean, suppose, I have a class called CustomUpdateHandler which extends DirectUpdateHandler2 and overrides the required functions, how would I go about using my CustomUpdateHandler class? Should I create a instance of it or should I deploy it in the solr source library? I’m kind of confused. I know it’s a noob question to ask, still I need the answer. So, please help!

    JTheRocker aka S Saikia

    • Glad to help. Once you have the class compiled into a jar you can deploy it to SOLR’s lib directory.

      In the conf/solrconfig.xml file is where the update handler is defined. Simply replace this

        <updateHandler class="solr.DirectUpdateHandler2">
        </updateHandler>
      

      with

        <updateHandler class="com.j.the.rocker.CustomUpdateHandler">
        </updateHandler>
      

      Restart SOLR so that it picks up the jar and the modified configuration file.

      Be sure to give the fully qualified class in the configuration file (I don’t think I have to mention it but I made up the package for your class above – of course, you’ll use whatever package you defined).

  4. Hi Dan,

    Forgive for re-posting, but I would also like to know about the DirectUpdateHandler2’s constructor which I I’m forced to call back from from my custom class while extending it.
    It takes a SolrCore object as an argument. So, it addition to above post of mine, I would also like to know what instance of SolrCore object I need to pass to the constructor while instantiating DirectUpdateHandlier2 class?

    • You don’t instantiate the update handler yourself so just pass the core to super as such (this is exactly how ours is coded except for the class name)

        /**
         * Constructor called by the SOLR process. NOTE: The one with the SolrCore
         * argument is called. Not the no-argument constructor.
         *
         * @param core the core being updated
         * @throws IOException
         */
        public CustomUpdateHandler(final SolrCore core) throws IOException {
          super(core);
        }
      

      SOLR will instantiate for you the update handler defined in the conf/solrconfig.xml file.

      I hope that makes sense.

  5. Hi Dan,

    Everything went fine, I can see my custom folder inside the /data directory but, when I try to create a index in solr using solr api, I get an error “exception in thread “main” org.apache.solr.common.SolrException: no segments* file found in org.apache.lucene.store.NIOFSDirectory@/home/jtherocker/softwares/apache-solr-3.4.0/example/solr/data/jtherocker lockFactory=org.apache.lucene.store.NativeFSLockFactory@115126e: files: [write.lock]’ ”

    I guess this is due to the following function in getNewIndexDir()
    createEmptyIndexDirectory(indexDirName). So how should I create the proper index using this function? Can you please give me the implementation of this method?

    • Sure thing.

        /**
         * The name of this method is somewhat a misnomer. The directory created by
         * this method is not empty. In the end it will contain initial segment files.
         * If there is no directory with those initial segment files SOLR will crap
         * out.
         *
         * @param indexDirectory path to the index directory
         * @return was the creation a success
         */
        private boolean createEmptyIndexDirectory(final String indexDirectory) {
          boolean indexDirectoryCreatedSuccessfully = true;
          // Create the index if it doesn't exist.
          if(!directoryExists(indexDirectory)) {
      			log.warn(getSolrCore().getName() + " Solr index directory '" + indexDirectory + "' doesn't exist." + " Creating new index...");
      
            if(createDirectory(indexDirectory)) {
              SolrIndexWriter writer = null;
              try {
                // getDirectoryFactory(), getMainIndexConfig(), and getDeletionPolicy()
                // shown in previous comments above
                writer = new SolrIndexWriter("SolrCore.initIndex", indexDirectory,
                    getDirectoryFactory(), true, schema, getMainIndexConfig(),
                    getDeletionPolicy());
              }
              catch(final IOException e) {
                log.error(getSolrCore().getName() + " Trouble merging indexes to '" + indexDirectory + "'", e);
                indexDirectoryCreatedSuccessfully = false;
              }
              finally {
                if(writer != null) {
                  try {
                    writer.close();
                  }
                  catch(IOException e) {
                    // ignore
                  }
                }
              }
            }
            else {
              log.warn(getSolrCore().getName() + " Solr index directory '" + indexDirectory + "' could not be created.");
            }
          }
          return indexDirectoryCreatedSuccessfully;
        }
      
      
        /**
         * Create just the directory - no contents.
         *
         * @param indexDirectory
         * @return was the creation a success
         */
        private boolean createDirectory(final String indexDirectory) {
          return new File(indexDirectory).mkdirs();
        }
      
  6. Hi, your code worked beautifully. I just need a final help.
    Whenever I make a instance of solr server using CommonsHttpSolrServer inside of my map() function in hadoop. Take for an instance I have got a file let say of 83 MB(one flat text file) in HDFS(Block size 64MB). No. of mappers running will be 2, Right? Lets say, for every value (record) I get within map() function, I split it to get 7 fields to store in Solr. The whole simple process is taking a hell lot of a time to accomplish, since, one map() function run for one block in HDFS and total records sums to around 12000. Do you have any idea what is happening? Please point me to where I’m wrong.

    Thank you very much again.

    • Off the top of my head, the first guess that comes to mind is this. Is a new instance of the SOLR server being created more than once in the mapper? When using the embedded SOLR it is like starting up SOLR from the command line as far as the startup process is concerned. That initial startup can take some time. If that is happening more than once in the mapper the process will take a long time.

      Let me know if this is not the case and I’ll look into it further.

      • So, here’s what I’m doing for an example in the mapper, just the algorithm is shown,

        public MyClass{
        public static MyClassMapper extends Mapper {
        static CommonsHttpSolrServer server;
        static {
        CommonsHttpSolrServer server =
        new CommonsHttpSolrServer();
        …..
        …..
        }

        void map(Object key, Text value, Context context) {
        //spliting value to some fields
        //create a solr doc
        //put those fields into solr
        ….
        ….
        server.add(myDoc);
        server.commit();
        }
        }
        }

        What I’m doing is creating a CommonsHttpSolrServer object inside the inner static Mapper class, not inside the map() function as shown above. Please tell me if I’m making a mistake or I need to follow a difference approach of declaring that CommonsHttpSolrServer object.

        Thank you.

  7. Sorry for the errors above. I don’t have to re-declare ‘CommonsHttpSolrServer server’ again in the static block inside MyClass
    Here’s the code again,

    public MyClass{
    public static MyClassMapper extends Mapper {
    static CommonsHttpSolrServer server;
    static {
    server =
    new CommonsHttpSolrServer();
    …..
    …..
    }

    void map(Object key, Text value, Context context) {
    //spliting value to some fields
    //create a solr doc
    //put those fields into solr
    ….
    ….
    server.add(myDoc);
    server.commit();
    }
    }
    }

    • I have a feeling that the primary bottleneck is in calling SolrServer#commit() in the Mapper#map(KEYIN, VALUEIN, Context) method. We put commit() in Mapper#cleanup(Mapper.Context) since cleanup(Mapper.Context) is called once at the end of the map job.

      We’re using the EmbeddedSolrServer which starts up Solr internally, and connects directly, all in a single JVM. The CommonsHttpSolrServer connects to an already running SOLR server over HTTP. Both extend SolrServer, however, since CommonsHttpSolrServer uses HTTP the process will incur all the overhead of an HTTP call (e.g., network latency, server processing). To avoid the extra overhead and increase performance you might consider switching to EmbeddedSolrServer.

      Note if you do switch to EmbeddedSolrServer make sure you that SolrCore#close() is called after calling commit(). The close() method releases all the resources allocated by the core if they are no longer in use. Before doing this there was a bug in our system where the hadoop job would end successfully but since there were still resources open the process continued to run. After running the job several times there were mapper processes piling up eating system resources and we began running out of memory. We could see the processes still running but figuring out why was tricky (strace and lsof on Linux are good friends of mine).

      Depending on your needs, you may also consider using SolrServer#optimize() in place of commit(). This method does everything that commit() does but it also merges all index segments into a single segment, which can make the search faster. This method can add more time to the index generation process. Depending on the size of the final segments, quite a bit of time. I mention it only as an option to explore once you have everything else working as you like.

      Hope this helps. Let me know how it goes.

  8. I have tried using the EmbeddedSolrServer. It worked fine when I tried it without MapReduce. But, When I’m using it inside my MyClassMapper static inner class (shown below), it’s giving some sort of error.

    Here’s how I’m using EmbeddedSolrServer in my code.

    public MyClass{
    public static MyClassMapper extends Mapper {
    static EmbeddedSolrServer server;
    static {
    System.setProperty(“solr.solr.home”,
    “/tmp/example/solr”);
    CoreContainer.Initializer initializer =
    CoreContainer.Initializer();
    CoreContainer coreContainer = null;
    try {
    coreContainer = initializer.initialize();
    } catch (ParserConfigurationException e1) {

    e1.printStackTrace();
    } catch (SAXException e1) {

    e1.printStackTrace();
    }
    server =
    EmbeddedSolrServer(coreContainer, “”);
    }
    public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {


    server.add(myDoc);
    }

    /**
    * Called once at the end of the task.
    */
    @Override
    protected void cleanup(Mapper.Context context) {

    server.commit();
    SolrCore.getSolrCore().close();

    }
    }
    }

  9. The error I got was this in the hadoop task syslog

    org.apache.solr.common.SolrException: No such core:
    at org.apache.solr.client.solrj.embedded.EmbeddedSolrServer.request(EmbeddedSolrServer.java:104)
    at org.apache.solr.client.solrj.request.AbstractUpdateRequest.process(AbstractUpdateRequest.java:105)
    at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:64)
    at cocacola.mr.lucidSolr.SolrMRTry$Mapping.map(SolrMRTry.java:144)
    at cocacola.mr.lucidSolr.SolrMRTry$Mapping.map(SolrMRTry.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)
    2011-10-24 10:58:25,574 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task

  10. Yup I solved that issue. I have put all EmbeddedSolrServer declaration inside the setup() function. For the ‘No such core:’, I tried with the solr core name in while declaring ‘server = EmbeddedSolrServer(coreContainer, “MyCoreName”)’. It worked! MyCoreName should be same as corename mentioned in solr.xml.
    Lets see what comes next. I’m still working.

    Thank for your help.

    • Glad you got it working. I downloaded the latest SOLR 1.4.0 and set it up (so that our SOLR configs, etc. would not interfere). I installed it configured as a non-multi-core version. For me, it worked using “” for the core name (in my example SOLR I have no solr.xml). Yes, the core names do need to match. Glad you found the issue. Good luck moving forward.

  11. Hi Dan,

    I would like to know how do you use the reducer to copy the MapReduce created solr index directories from each slave node to a single node (lets say master node) in the reduce phase? I’m confused.

    Thank you.

    • Perhaps the write-up was misleading. The reducer does not copy the directories to a single node from all the other nodes.

      During the cleanup of the mapper the directories generated by the mapper are copied from the local file system to HDFS. HDFS is accessible by all nodes so no matter where Hadoop starts the one reducer it will have access to all those directories.

      Then when the reduce method is called in the reducer those directories are copied from HDFS to the local file system of that one node on which the reducer is running.

  12. Hi Dan,

    I have one more doubt regarding your article. You said that you’d copy the indexes from the hdfs to a node’s local filesytem in the reducer. Aren’t there many reducer functions running in parallel throughout the cluster. I mean, you told that you’ve set the number of reducers to 1. If it’s really the case that only one reducer instance exist int he whole cluster, where will it run? I’m kind of confused.

    Please advice. Thank you.

    • Correct, we limit the number of reducers to 1. When you tell Hadoop that it can start one reducer it starts that reducer on a node and all the reducing is done on that node.

      Hadoop starts one reducer in the same way as it starts multiple reducers. The only difference is that it only starts one. I don’t know of a way to find out which node that reducer will run on.

  13. Hi Dan,
    Great! one more thing. Can’t we do both the copy and merge in reducer cleanUp() itself?

    Why to use the reduce() at all?

    • Yes, that is a possibility.

      We use reduce so that we get a little better idea of the progress of the job. When in the reduce phase Hadoop provides status on what percentage is compelte. If memory serves correctly it doesn’t do that while in the cleanup phase.

  14. Hi Dan,

    Let’s say, we have more than 1000 map tasks running in a cluster. So, all the map tasks will generate more than 1000 index directories in HDFS. Assume, we copy all of them to a slave node and try to merge them together.
    I tried the above procedure, it gave me ‘too many open files exception’ in for solr, while merging in the reduce clean up. It goes fine if the directories I’m merging are limited to some extent. I tried with setting ‘ulimit’ to unlimited but no use. So, have you came across this problem? Or you chose some other approach for several directories merging (like batch merging)?

    Thank you!

    • No, we have never run into this situation. Setting a higher ulimit would have been my first suggestion.

      Seeing that didn’t work, I found this link http://onjava.com/lpt/a/3273 In this article he describes a similar situation and a couple different solutions (e.g., mergeFactor, maxMergeDocs, in-memory indexing). Should none of these work the only other thing I can think of is, as you mentioned, merging as many directories as you can over and over in a batch-like process.

      One thing that dawned on me just now is that you could merge the directories generated by the mapper in the cleanup process and then copy that merged directory to HDFS. That would reduce the number of directories the reducer would have to work with.

      Let me know how you solved this problem. Since I’ve never come across it before it would be handy to understand how you solved it.

  15. Since, many map tasks will be running in parallel in a node and each map task running will have its own cleanup function, how would I merge the directories generated by the mappers in the cleanup after all the mappers complete? Is there any way, I could have a function for map part in MapReduce that will run only once for a slave node, after all the tasks complete?

    I’m now trying out the batch like merging, I’ll give you an update soon.

    Warm regards

  16. Hi Dan,

    I finally managed to do the trick. I used batch merging for the time being.
    And, it’s working fine. But, my index size is 3 times the original file I used for indexing. And the process of copying from HDFS and merging took me over 1 hr. So, is there any way I could decrease that time. I mean if the size of index goes beyond say 50GB, copying and merging will take a hell lot of a time. Futhermore, do have any idea how I could somehow show a progress status bar like thing so that, I could know how much is being copied and merged?

    • Glad it’s working for you! Sorry it’s taken so long to reply. I’ve had some health issues that have consumed a great deal of energy and time.

      Unfortunately, we’ve not run into this problem unexpectedly. That is after experimentation we’ve found which indexes are going to be large relative to the other indexes generated.

      We have one index that is huge and we’ve sharded it (http://wiki.apache.org/solr/DistributedSearch). For this index there are 10 shards each of which have an index that is about 30 GB. I wasn’t actively involved in this project so I won’t be of much help if you have any questions.

      As far as showing a progress during the indexing the only thing I can offer is to watch the hadoop job tracker and/or monitor the files as they are being written to. Normally hadoop can provide a progress (e.g., TaskAttemptContext#progress()) but since the merging process is a single process event you can only call progress() before and after.

  17. Hi Dan,

    The batch merging thing worked. But while I was testing I’m getting a index size of around 2.5 GB, which took a lot of time to copy and merge.

    Is there any alternative way to reduce the size? Have you implemented any way to get the progress of the copy and merge phase?

    Thank you.

    • Great news on the batch merging! 2.5 GB doesn’t seem very large. We regularly (daily) build indexes that are > 3 GB. I just checked one of them and the entire process takes less than 30 minutes (download data, parse, index, copy to hdfs, copy to 4 production SOLR machines). We’re running on a 30 node cluster (indexing used 34 mappers) and to avoid potential interferrence with other processes it is completely separate from the rest of our network.

      Perhaps if you can narrow down the bottleneck you’ll be able to adjust the environment to accomodate (e.g., disks are slow, network is slow due to heavy load).

  18. Sorry, I didn’t know you were sick. Hope you are doing well now.
    Thanks for the help you gave. So my project with solr is kind of
    in halt for some reason. We’ll probably discuss more in future.
    I’ll pray to God for your health.
    May you remain in good health in future.

    My real name’s Sunayan Saikia and here’s my blog where I have drawn a little diagram of your approach above, if you don’t mind. :) I have kept a reference to your site at the introduction – sunayansaikia.blogspot.com

  19. Thanks, JTheRocker. I appreciate that. I am doing better (I go in for a CT scan tomorrow to make sure it’s actually gone).

    And big thanks for the mention on your blog! I’m glad I could be of help.

  20. Why would not have a solr server running on each node and index using the solr rest api? Why do you need each map job to create its own index?

    • Thanks for the questions. You could certainly do it using the REST API. We’ve been running with this set up for a over a year now and I cannot remember why the decision was made to not use the REST API. A couple things come to mind, however.

      While I haven’t tested it I would expect that using the REST API would take more time per request than using the embedded SOLR. There wouldn’t be much network overhead since it’s on the same box. For every request there is, however, the overhead of making an HTTP request, sending the request/response through the servlet framework as well as the overhead of all the workings of an application server. This is can add up when working with a large number of requests as would be expected in a distributed environment.

      We also do quite a bit of custom SOLR coding both on the query end and the indexing end. It was easier to use the embedded SOLR to use this custom code than to use the REST API.

      I am not getting the gist of your second question. Allow me to explain what I understand and maybe that will help you see what I’m missing. Each node would have it’s own SOLR server running. Hadoop would send out data/mappers to various nodes. For instance, the mapper running on node 3 would make REST calls to SOLR running on node 3 and an index on node 3 would be built. At the same time another mapper would run on node 6 making REST calls to SOLR running on node 6 and an index on node 6 would be built. At the end you still end up with multiple indexes. How is that different? How did I misinterpret your second question?

  21. Hi dan ,

    Could you please explain how to append data to already indexed file in Solr.Indexed file is genarted or imported from DB using Data Import Handler (DIH) and Now i want to append the index file with the new entries in the table. So could you please give me some example or code snippet for tha above task.thanks in Advance

  22. Hi Dan,

    I am looking at a use case currently, which requires indexing huge amounts of data. I am planning on using SolrCloud. If you have already explored it, would you suggest using Hadoop Map/Reduce still or SolrCloud in itself would suffice sharding/ storing indexes?

    Thanks!

    • Hello and thank you for your question.

      I have not used SolrCloud yet. We are going to be starting a project for which we would like to use it but we are still in the early planning phases. You can be sure I’ll write something up here as I learn more about it.

      From reading about SolrCloud I understand that it will do what we need for large distributed indexing. However, since I haven’t worked with it personally I cannot recommend it with confidence.

      Best of luck to you. If you decide to implement a solution with SolrCloud I’d love to hear about your experiences (good, bad, ugly).

      Dan

  23. Hello Everyone,

    Cloudera has developed MapReduceIndexerTool(MRIT) especially meant to build distributed solr index and merge it to a live solrcloud instance using GoLive option in MRIT. It’s open source. Let me know if anyone of you have tried it. It’s a great tool, but I face an issue while merging index onto a remote solrcloud which I use only for search.

    Great blog and very useful discussion everyone.

    Thanks,
    Nirmal

Leave a Reply