Source: lib/child_manager.js

/*
Child_manager.js
@author Tilak Patidar



	*starter_lock
	*locks the starter to avoid running 
	*the same function due to setInterval 
	*when old starter instance has not finished

	*active_childs
	keep the count of current active_childs
*/
var parent_dir = process.getAbsolutePath(__dirname);
var _ = require("underscore");
var check = require("check-types");
var Server = require(parent_dir + "/lib/server.js");
var Graph = require(parent_dir + "/lib/graph.js");
var child = require('child_process');
var fs = require("fs");
var ObjectId = require('mongodb').ObjectId;
var proxy_cache_class = require(parent_dir + '/lib/bucket_proxy.js');
var Lock = require(parent_dir + '/lib/lock.js');
var RSSFetcher = require(parent_dir + '/lib/rss.js');
var URL = require(parent_dir + "/lib/url.js");
var proto = require(parent_dir + '/lib/proto.js');
var JSONX = proto.JSONX;
var ObjectX = proto.ObjectX;
var BloomFilter = require('bloomfilter').BloomFilter;
var elasticsearch = require('elasticsearch');

/**
    Manages workers owned by the bot and manages 
    the communication and cordination between workers. 
    @author Tilak Patidar <tilakpatidar@gmail.com>
    @constructor
    @param {Message} message_obj

*/
var ChildManager = function(message_obj) {
    var message = message_obj;
    var cluster = message.get('cluster');
    var config = message.get('config');
    var childs = config.getConfig("childs"); //childs to spawn
    var batchSize = config.getConfig("batch_size");
    var botObjs = message.get('botsObj'); //stores the robots.txt data
    var es_client;



    /** Tracks size of bloom filter
        @private
    */
    var bloom_length = 0;

    /** Bloom filter n value
        @private
    */
    var BLOOM_N = 10000000;

    /** Bloom filter m value
        @private
    */    
    var BLOOM_M = 287551752;

    /** Bloom filter k value
        @private
    */    
    var BLOOM_K = 20;

    /**

        Bloom filter is used to reduce duplicate error from db for url seen test.
        n
        Number of items in the filter
        p
        Probability of false positives, float between 0 and 1 or a number indicating 1-in-p
        m
        Number of bits in the filter
        k
        Number of hash functions
        n = 10,000,000, p = 1.0E-6 (1 in 1,000,000) → m = 287,551,752 (34.28MB), k = 20
        http://hur.st/bloomfilter

        @private
        @type {BloomFilter}
    */
    var bloom = new BloomFilter(
          BLOOM_M, // number of bits to allocate. //34.28MB
          BLOOM_K        // number of hash functions.
    );


    message.set('inlinks_pool', []);

    var first_time_lock = new Lock();
    /**
        Used as a queue, so that different domain group buckets are fetched from db for crawling.
        @private
    */
    var prev_domain_grp = [];


    message.set('begin_intervals', true);
    var active_childs = 0;
    var pool = message.get('pool'); //global db object in the entire app
    var spawned = {};
    var bot_spawn_time = {};
    var log = message.get('log');
    var bot_obj = message.get('bot');
    var that = this;
    var graph = new Graph(message);
    var server = new Server(message);
    server.start();
    URL = new URL(message);
    var rss_fetcher_obj = new RSSFetcher(message);

    if(config.getConfig("elasticsearch_server")){

        var es_config = config.getConfig("elasticsearch");
        msg("Attempting to connect to elasticsearch " + JSON.stringify(es_config, null, 2) , "info");
        es_client = new elasticsearch.Client(es_config);
    
    }

    /**
        Responsible for allocating vacant childs 
        This function is run continously in an interval to check and 
        realocate workers.
        @private

    */
    function starter() {


        if (!message.get('starter_lock').enter()) {
            msg("starter locked", "info");
            return;
        }

        msg("Check if new child available", "info");
        msg(("Current active childs " + active_childs), "info");
        var done = childs - active_childs;
        var old_done = done;
        if (done === 0) {
            message.get('starter_lock').release();
            return; //no available childs
        }
        var d = 0;
        while (done !== 0) {


            if (!message.get('webappOnly')) {
                nextBatch(function nextBatchStarter() {
                    ++d;
                    //console.log("CALLBACK    ",d,"  done  ",old_done);
                    if (d === old_done) {
                        msg("starter lock released", "success");
                        message.get('starter_lock').release();
                    }
                }); //call nextBatch to obtain a new bucket and start a worker
            } else {
                message.get('starter_lock').release();
            }
            --done;
        }

    }; //end of starter


    var first_clone = true;
    var getNextBatch = function getNextBatch(result, batchSize) {

        if (first_clone) {
            prev_domain_grp = ObjectX.clone(message.get('alloted_domain_groups'));
            first_clone = false;
        }

        var grp = prev_domain_grp.splice(0, 1)[0];
        prev_domain_grp.push(grp);
        //console.log(grp);
        var stamp1 = new Date().getTime();
        pool.bucket_collection.findAndModify({
            "domains": grp,
            "underProcess": false,
            "recrawlAt": {
                $lte: stamp1
            }
        }, [
            ['recrawlAt', 1],
            ['score', 1]
        ], {
            "$set": {
                "underProcess": true,
                "processingBot": config.getConfig("bot_name")
            }
        }, {
            "remove": false
        }, function getNextBatchFM(err1, object) {
            //console.log(object,err1)

            if (check.assigned(object) && check.assigned(object.value)) {
                var hash = object["value"]["_id"];
                //console.log(hash);
                var refresh_label = object["value"]["recrawlLabel"];
                pool.mongodb_collection.find({
                    "bucket_id": hash,
                    "abandoned": {
                        "$exists": false
                    }
                }, {}, {}).toArray(function getNextBatchFind(err, docs) {
                    //console.log(err,docs);
                    if (!check.assigned(err) && docs.length === 0) {

                        //we got empty bucket remove it 
                        //every time seed bucket is readded if seeds are already added
                        //this could lead to empty bucket
                        //remove it if you encouter an empty bucket

                        //fixed from seed but still removing any empty bucket

                        pool.bucket_collection.removeOne({
                            "_id": ObjectId(hash)
                        }, function() {

                            msg("empty bucket removed ", 'success');
                        });
                    }
                    if (err) {

                        msg("getNextBatch", "error");
                        result(null, [], null, null, null);
                        return;
                    } else {
                        bot_obj.updateStats("processedBuckets", 1);
                        //#debug#console.log(docs);
                        msg(("Got " + docs.length + " for next Batch"), "success");
                        result(err, docs, (hash + ""), refresh_label, grp);
                    }


                });
            } else {
                result(null, [], null);
                return;
            }
        });



    };

    /**
        Called by starter to fetch next batch from db.
        @private
        @param {Function} fn - callback
    */
    function nextBatch(fn) {
        active_childs += 1;
        /*

            encapsulated function which checks for an available 
            bucket and starts a new worker
        */
        getNextBatch(function getNextBatch(err, results, hash, refresh_label, domain_group_id) {

            if (first_time_lock.enter()) {

                message.get('bucket_creator_lock').release();
                //will unlock the bucker_creator for first time
            }

            if (results.length !== 0 && check.assigned(hash)) {
                //if bucket is not empty 

                msg("Got bucket " + hash, "info");
                createChild(results, hash, refresh_label, domain_group_id);
            } else {
                active_childs -= 1;
                //inlinks_pool into db as childs are available but no buckets
                that.flushInlinks(function() {});
            }
            fn();

        }, batchSize);
    }


    /**
        Fetches a batch from failed queue.
        @private
    */
    function nextFailedBatch() {
        if (!message.get('failed_batch_lock').enter()) {
            return;
        }


        var li = [];
        pool.failed_db.find({
            "status": 0,
            "count": {
                "$lt": config.getConfig("retry_times_failed_pages")
            }
        }, {
            "limit": config.getConfig('failed_queue_size')
        }).toArray(function(err, docs) {
            //console.log(err,docs);
            if (check.assigned(err)) {
                message.get('failed_batch_lock').release();
                return;
            }

            if (check.assigned(docs)) {
                if (docs.length === 0) {
                    message.get('failed_batch_lock').release();
                    return;
                }
                var ids = _.pluck(docs, '_id');
                var idss = [];
                _.each(ids, function(item) {
                    idss.push(item.toString());
                });
                //console.log(idss,"############################## IDS");
                pool.failed_db.update({
                    "_id": {
                        "$in": idss
                    }
                }, {
                    "$set": {
                        "status": 1
                    }
                }, function(e, r) {
                    //console.log(e,r,"updateeeeeeddeed #########################");
                    _.each(docs, function(doc){
                        doc["failed_info"]['bucket_id'] = 'failed_queue_' + doc["failed_info"]['bucket_id'] + '_' + doc["_id"] + '_' + doc["count"];
                        li.push(doc["failed_info"]);
                    });
                    //console.log(li);
                    createChild_for_failed_queue(li, 'failed_queue', 'failed_queue'); // sending failed_queue string instead of bucket hash
                });
            }


        });



    }


    /**
        
        Spawns a new child process for the failed queue.
        @private
        @param {Object} bucket_links - Fetched batch by getNextBatch
        @param {String} hash - Batch hash id
        @param {String} refresh_label - Fetch Interval of the batch

    */
    function createChild_for_failed_queue(bucket_links, hash, refresh_label) {
        msg('Starting failed_queue', 'info');
        var botId = new Date().getTime() + "" + parseInt(Math.random() * 10000); //generate a random bot id
        var bot = child.fork(parent_dir + "/spawn.js", []);
        spawned[botId] = bot; //saving child process for killing later in case of cleanup
        bot_spawn_time[botId] = {
            'bot': bot,
            'spawn_time': new Date().getTime(),
            'bucket_links': bucket_links,
            'hash': hash
        };
        msg('Child process started ' + botId, "success");
        var c = config.getGlobals(); // tuple of config,overriden_config,db_config
        var args = [bucket_links, bucket_links.length, message.get('links_store'), botObjs, hash, refresh_label, c[0], c[1], c[2], "failed_queue"];
        //not sending args with child process as char length limitation on bash

        //bot waits for this "init" msg which assigns the details of the task
        try {
            bot.send({
                "init": args
            });
        } catch (err) {
            console.log(err);
        }



        bot.on('close', function BotCloseFailedQueue(code) {
            //pushing the pool to db
            that.flushInlinks(function() {});
            if (code === 0) {
                msg(('Child process ' + botId + ' exited with code ' + code), "success");
            } else {
                msg(('Child process ' + botId + ' exited with code ' + code), "error");
            }
            message.get('failed_batch_lock').release();
            delete bot_spawn_time[botId];
            delete spawned[botId]; //delete from our record
            nextFailedBatch();

        });

        bot.on("message", childFeedback);
    }


    /**
        
        Spawns a new child process for the normal queue.
        @private
        @param {Object} bucket_links - Fetched batch by getNextBatch
        @param {String} hash - Batch hash id
        @param {String} refresh_label - Fetch Interval of the batch

    */
    function createChild(bucket_links, hash, refresh_label, domain_group_id) {

        
        var botId = new Date().getTime() + "" + parseInt(Math.random() * 10000); //generate a random bot id
        var bot = child.fork(parent_dir + "/spawn.js", []);
        spawned[botId] = bot; //saving child process for killing later in case of cleanup
        bot_spawn_time[botId] = {
            'bot': bot,
            'spawn_time': new Date().getTime(),
            'bucket_links': bucket_links,
            'hash': hash
        };
        msg('Child process started ' + botId, "success");
        var c = config.getGlobals(); // tuple of config,overriden_config,db_config
        var args = [bucket_links, bucket_links.length, message.get('links_store'), botObjs, hash, refresh_label, c[0], c[1], c[2], "normal", domain_group_id];
        //not sending args with child process as char length limitation on bash

        //bot waits for this "init" msg which assigns the details of the task
        try {
            bot.send({
                "init": args
            });
        } catch (err) {
            console.log(err);
        }



        bot.on('close', function botClose(code) {
            //pushing the pool to db
            that.flushInlinks(function() {});

            if (code === 0) {
                msg(('Child process ' + botId + ' exited with code ' + code), "success");
            } else {
                msg(('Child process ' + botId + ' exited with code ' + code), "error");
            }


            active_childs -= 1;
            delete bot_spawn_time[botId];
            delete spawned[botId]; //delete from our record 
            starter();

        });

        bot.on("message", childFeedback);

    }




    /**
        Recieves message from all the workers.
        @private
        @param {Object} data - {"bot": "spawn", "insertRssFeed": [link.details.url, feeds]}
    */
    function childFeedback(data) {
        var sender = data["bot"];
        var l = Object.keys(data).join(" ").replace("bot", "").trim();
        switch (l) {
            case "insertTikaQueue":
                //console.log("recieved",sender);
                if (sender === "spawn") {
                    var d = data["insertTikaQueue"];
                    //console.log(d);
                    pool.insertTikaQueue(d);
                }
                break;
            case "insertRssFeed":
                var d = data["insertRssFeed"];
                pool.insertRssFeed(d);
                break;
            case "insertAuthor":
                var d = data["insertAuthor"];
                pool.insertAuthor(d);
                break;
            case "setCrawled":
                var t = data["setCrawled"];
                //console.log(t)
                that.setCrawled(t); //mark as crawled


                break;
            case "addToPool":
                
                

                var u = data["addToPool"][0];

                if(bloom_length >= (BLOOM_N * 95/100)){

                    //if bloom filter grows more than 95% of size reset it
                    bloom = new BloomFilter(
                          BLOOM_M, // number of bits to allocate. //34.28MB
                          BLOOM_K        // number of hash functions.
                    );
                    bloom_length = 0;

                }


                if(!bloom.test(u)){

                    bloom.add(u);
                    bloom_length += 1;

                    message.get('inlinks_pool').push(data["addToPool"]);

                }else{
                    //console.log(u, "  found in bloom filter");
                }

                


                if (message.get('inlinks_pool').length > batchSize) {
                    that.flushInlinks(function() {

                    });
                }

                break;
            case "finishedBatch":
                var g = data["finishedBatch"];
                if (g[2] === "failed_queue") { //bot type failed queue
                    break; //ignore
                }
                pool.batchFinished(g[0], g[1], function() {}, true); //set batch finished
                break;
            case "tikaPID":
                message.set('tikaPID', data["tikaPID"]);
                //log the pid so that if bot is started again it can kill a old instance of tika server
                fs.writeFileSync(parent_dir + "/db/pids/tikaPID.txt", data["tikaPID"] + "");
                break;
            case "graph":
                var url = data["graph"][0];
                var parent = data["graph"][1];
                graph.insert(url, parent);
                break;

        }

    }

    /**
        Fetches rss files and updates links from it.
        Rss file links are provided from the rss collection of crawler.
        This function is run in a setInterval.
        @private
    */

    function rss_links_updator() {

        if (!message.get("rss_updator_lock").enter()) {
            return;
        }

        try {
            pool.rss_feeds.findOne({
                "nextRefresh": {
                    "$lt": new Date().getTime()
                },
                "domains":{
                    "$in": message.get('alloted_domain_groups')
                }
            }, function(err, doc) {

                if (check.assigned(err) || !check.assigned(doc)) {
                    message.get("rss_updator_lock").release();
                    return;
                }


                rss_fetcher_obj.getLinks(doc["_id"], function(li) {

                    pool.rss_feeds.updateOne({
                        "_id": doc["_id"]
                    }, {
                        "$set": {
                            "nextRefresh": new Date().getTime() + 86400000
                        }
                    }, function() {

                        var l = [];

                        _.each(li, function(item){
                            var url = URL.url(item);
                            if (url.isAccepted()) {
                                var format = [];
                                format.push(url.details.url);
                                format.push(url.details.domain);
                                format.push(doc["_id"]);
                                l.push(format);
                            }
                        });

                        
                        message.set('inlinks_pool', message.get('inlinks_pool').concat(l));

                        msg("links " + l.length + " obtained from rss " + doc["_id"], "success");
                        message.get("rss_updator_lock").release();

                    });
                });
            });
        } catch (e) {
            console.log(e, "rss_links_updator");
            message.get("rss_updator_lock").release();
        }


    }

    /**
        Launches a child process for pdf requests
        @private
    */
    function startTika() {
        if (config.getConfig("tika")) {
            var tika = child.fork(parent_dir + "/tika.js", []);
            msg("Tika server forked", "success");
            var c = config.getGlobals();
            tika.send({
                "init": [c[0], c[1], c[2], message.get('links_store')]
            });
            spawned["tika"] = tika;
            tika.on("message", childFeedback);
            tika.on("error", function() {
                console.log(arguments);
            });
            tika.on('close', function tika_close(code) {
                if (code !== 0) {

                    msg("Tika port occupied maybe an instance is already running ", "error");

                }
            });
        }
    }


    if (!message.get('webappOnly')) {
        //starting child process for tika

        startTika();
        var restarter_locked = new Lock();
        var tika_restarter = setInterval(function tika_restarter() {
            if (!restarter_locked.enter()) {
                return;
            }

            if (spawned["tika"]) {
                try {
                    msg("Check if tika server alive", 'info');
                    spawned["tika"].send({
                        "ping": true
                    });
                    msg("Tika server alive", 'success');
                } catch (e) {
                    console.log(e);
                    //could be dead
                    //restart
                    msg("Tika server could be dead", 'error');
                    try {
                        spawned["tika"].kill();
                    } catch (ee) {
                        console.log(ee);
                    } finally {
                        process.nextTick(startTika);
                    }

                }

            }
            restarter_locked.release();
        }, 20000);
        var a = setInterval(starter, 2000);
        var b = setInterval(nextFailedBatch, 15000);
        var rss = setInterval(rss_links_updator, 1000);
        //var c1 = setInterval(botKiller,config.getConfig('child_timeout'));
        message.set('my_timers', message.get('my_timers').concat([a, b, tika_restarter]));

    }


    /**
        Get the number of active childs in the manager
        @public
    */
    this.getActiveChilds = function getActiveChilds() {

        return active_childs;
    };

    /**
        Returns the state of the starter function
        @public
    */
    this.isManagerLocked = function isManagerLocked() {

        return message.get('starter_lock').isLocked();
    };

    /**
        Locks or unlocks the interval running starter function.
        @param {boolean} state - true/false the lock
    */
    this.setManagerLocked = function setManagerLocked(state) {
        return message.get('starter_lock').setLocked(state);
    };


    /**
        In case of clean up,flushInlinks into the db.
        @public
        @param {Function} fn - callback
    */
    this.flushInlinks = function flushInlinks(fn) {
        if (!check.assigned(fn)) {
            fn = function(){};
        }
        var li = message.get('inlinks_pool');
        message.set('inlinks_pool', []);


        if (li.length === 0) {
            //buckets with empty links will not be inserted
            fn(false);
            return;
        }

        var inserted_docs = [];
        for (var i = 0; i < li.length; i++) {
            //inserting new links in cache
            var domain = li[i][1];
            var url = li[i][0];
            var parent = li[i][2];
            var refresh_time;
            try {
                if (check.assigned(li[i][3])) {
                    refresh_time = li[i][3];
                } else {
                    refresh_time = message.get('links_store')[domain]["fetch_interval"];
                }
            } catch (err) {
                //console.log(err, domain);
                //refresh time not found in seed list, maybe other domain links from rss module
                continue;
            }

            if (!check.assigned(refresh_time)) {
                refresh_time = config.getConfig("default_recrawl_interval");
            }




            var unique_id = ObjectId();
            var level = url.replace('http://', '').split('/').length;
            var md5 = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            var doc = {
                '_id': unique_id,
                "url": url,
                "bucketed": false,
                "partitionedBy": config.getConfig('bot_name'),
                "domain": domain,
                "parent": parent,
                "data": "",
                "bucket_id": null,
                "fetch_interval": refresh_time,
                "level": level,
                "md5": md5
            };
            inserted_docs.push(doc);




        };

        if (inserted_docs.length === 0) {

            return fn();
        }

        pool.mongodb_collection.insertMany(inserted_docs, {
            ordered: false
        }, function insertLinksMany(err, doc) {
            //console.log(arguments);
            //console.log(err);
            if (check.assigned(err)) {

                msg("error in insertMany, may be some duplicate urls discovered", "error");

            } else {

                msg("Inlinks flushed in db", "success");

            }
            fn();


        });



    };


    this.setCrawled = function setCrawled(link_details, fn) {
        if (!check.assigned(fn)) {
            fn = function() {};
        }

        var url = link_details.url;
        var urlID = link_details.urlID;
        var data = link_details.parsed_content;
        var status = link_details.status_code;
        //console.log("############# RESPONSE STATUS ########### ",status);
        var stamp1 = new Date().getTime();
        var redirect_url = link_details.redirect;
        var response_time = link_details.response_time;
        var canonical_url = link_details.canonical_url;
        var alternate_urls = link_details.alternate_urls;
        var header_content_type = link_details.header_content_type;
        var md5 = link_details.content_md5;
        //console.log(link_details,"############FOR UPDATE#############");
        if (!check.assigned(data)) {

            data = "";
        }
        //#debug#console.log(status)
        if (!check.assigned(status)) {
            status = "0"; //no error
        }

        var add_docs = [];

        var dict = {
            "bucketed": true,
            "url": url,
            "data": data,
            "response": status,
            "lastModified": stamp1,
            "updatedBy": config.getConfig("bot_name")
        };
        var from_failed_queue = false;
        var abandoned = false;
        var failed_count = 0;
        var failed_id = 0;

        if (check.assigned(md5)) {
            dict["md5"] = md5;
        }

        if (check.assigned(header_content_type)) {
            dict["header_content_type"] = header_content_type;
        }

        if (check.assigned(alternate_urls) && !check.emptyObject(alternate_urls)) {
            dict["alternate_urls_lang"] = alternate_urls;

        }


        if (check.assigned(response_time)) {

            dict["response_time"] = response_time;
        }


        if (check.assigned(link_details.bucket_id) && !link_details['normal_queue']) {

            from_failed_queue = true;
            failed_count = parseInt(link_details.bucket_id.replace('failed_queue_', '').split('_').pop()) + 1;
            failed_id = parseInt(link_details.bucket_id.replace('failed_queue_', '').split('_')[1]);
            //console.log(link_details.url+' from failed_queue',failed_id,'\t',failed_count);
        }


        if (data === "" || status === 'ETIMEDOUT_CALLBACK' || status === 'ETIMEDOUT_CONNECTION' || status === 'ETIMEDOUT_READ' || status === -1) {
            //if 4xx or 5XX series status code then add to failed queue
            if (from_failed_queue) {
                //then check the count
                if (failed_count >= config.getConfig("retry_times_failed_pages")) {
                    dict["abandoned"] = true;
                    abandoned = true;
                    //if so mark abandoned and delete from queue
                    (function(failed_id, url) {
                        /*
                        failed_db.parallelize(function() {
                            failed_db.run("DELETE FROM q WHERE id=?",[failed_id],function delete_from_failed_queue(e,r){
                                    
                                    
                                    //console.log(e,failed_id,'marked abandoned');
                                    msg('Deleted from failed queue and abandoned'+url,'info');

                            });
                        });
                        */
                        pool.failed_db.removeOne({
                            "_id": failed_id
                        }, function delete_from_failed_queue(e, r) {
                            //console.log(e,failed_id,'marked abandoned');
                            msg('Deleted from failed queue and abandoned' + url, 'info');

                        })
                    })(failed_id, link_details.url);

                } else {
                    //inc by one and status = 0
                    (function(url, failed_id) {

                        /*
                            failed_db.parallelize(function() {
                                failed_db.run("UPDATE q SET count = count+1, status=0 WHERE id=?",[failed_id],function failed_retry_pushed(e,r){
                                    //console.log('counter increased ',failed_id);
                                    msg('Pushed again to retry in failed queue '+url,'info');
                                }); 
                            });
                        */
                        pool.failed_db.updateOne({
                            "_id": failed_id
                        }, {
                            "$set": {
                                "status": 0
                            },
                            "$inc": {
                                "count": 1
                            }
                        }, function failed_retry_pushed(e, r) {
                            //console.log('counter increased ',failed_id);
                            msg('Pushed again to retry in failed queue ' + url, 'info');
                        });

                    })(link_details.url, failed_id);

                }
            } else {

                dict['abandoned'] = false;
                (function(link_details) {
                    /*
                    failed_db.parallelize(function() {
                        failed_db.run("INSERT OR IGNORE INTO q(failed_url,failed_info,status,count) VALUES(?,?,0,0)",[link_details.url,JSON.stringify(link_details)],function insertFailed(err,row){
                            //console.log(err,row);
                            msg("Inserted failed url "+link_details.url+" into failed queue", 'success');
                        });
                    });
                    */
                    pool.failed_db.insert({
                        "failed_url": link_details.url,
                        "failed_info": link_details,
                        "status": 0,
                        "count": 0
                    }, function insertFailed(err, row) {
                        //console.log(err,row);
                        msg("Inserted failed url " + link_details.url + " into failed queue", 'success');
                    });
                })(link_details);
                return fn();
            }
        } else if ((status + "").indexOf("EMPTY_RESPONSE") >= 0) {
            //do not retry reject 
            dict["abandoned"] = true;
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            abandoned = true;
            delete dict["response_time"];

            //if so mark abandoned 
            bot_obj.updateStats("failedPages", 1);
            msg('Abandoned due to empty response ' + url, 'info');
        } else if (status === "MimeTypeRejected") {
            //do not retry reject 
            dict["abandoned"] = true;
            abandoned = true;
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            delete dict["response_time"];

            //if so mark abandoned 
            bot_obj.updateStats("failedPages", 1);
            msg('Abandoned due to mime type rejection ' + url, 'info');
        } else if ((status + "").indexOf("NOINDEX") >= 0) {
            //do not retry reject 
            dict["abandoned"] = true;
            abandoned = true;
            delete dict["response_time"];
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            //if so mark abandoned 
            bot_obj.updateStats("failedPages", 1);
            msg('Abandoned due to no INDEX from meta ' + url, 'info');
        } else if (status === "ContentTypeRejected") {
            //do not retry reject 
            dict["abandoned"] = true;
            abandoned = true;
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            delete dict["response_time"];
            //if so mark abandoned 
            bot_obj.updateStats("failedPages", 1);
            msg('Abandoned due to content type rejection ' + url, 'info');
        } else if (status === "ContentLangRejected") {
            //do not retry reject 
            dict["abandoned"] = true;
            abandoned = true;
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            delete dict["response_time"];
            //if so mark abandoned 
            bot_obj.updateStats("failedPages", 1);
            msg('Abandoned due to content lang rejection ' + url, 'info');
        } else {

            //link is from failed_queue and is successfull now
            if (from_failed_queue) {
                (function(url, failed_id) {
                    /*
                        failed_db.parallelize(function() {
                            failed_db.run("DELETE FROM q WHERE id=?",[failed_id],function failed_success(err,row){
                                
                                msg(url+' from failed_queue is successfull now','info');
                            });

                        });
                    */
                    pool.failed_db.removeOne({
                        "_id": failed_id
                    }, function failed_success(err, row) {

                        msg(url + ' from failed_queue is successfull now', 'info');
                    });

                })(link_details.url, failed_id);
            }


            dict["crawled"] = true; //final marker for crawled page



        }


        if (check.assigned(redirect_url)) {
            dict["redirect_url"] = redirect_url;
        }
        //console.log(dict);

        if (check.assigned(canonical_url) && canonical_url !== url) {
            //if both urls are same then no need to mark abandoned

            //if we are getting canonical_url val this means page was successfull and was parsed
            var new_dict = JSON.parse(JSON.stringify(dict));
            delete dict["crawled"]; //remove crawled marker
            dict["md5"] = ObjectId().toString() + "fake" + parseInt(Math.random() * 10000);
            new_dict['url'] = canonical_url;
            new_dict["alternate_urls"] = [url];
            new_dict["bucket_id"] = link_details.bucket_id;
            pool.mongodb_collection.findOne({
                "url": canonical_url
            }, function(err, doc) {
                //console.log("Find canonical_url ",err,doc," find canonical_url");
                if (check.assigned(err) || !check.assigned(doc)) {
                    //canonical url not present
                    //  console.log("canonical not present");
                    pool.mongodb_collection.insert(new_dict, function(err1, res1) {
                        //  console.log(arguments);
                        //insert the new canonical url in the same bucket
                        //console.log("Insert canonical_url ",err1,res1," insert canonical_url");

                    });
                } else {
                    //url already present update it
                    var aul = [];
                    if (check.assigned(alternate_urls) && !check.emptyObject(alternate_urls)) {
                        aul = alternate_urls;
                    }
                    pool.mongodb_collection.updateOne({
                        "_id": doc["_id"]
                    }, {
                        "$push": {
                            "alternate_urls": url
                        },
                        "$pushAll": {
                            "alternate_urls_lang": alternate_urls
                        }
                    }, function(e, r) {
                        //  console.log("update canonical_url ",e,r," update canonical_url");


                    });
                }
            });


            //now changes to this page
            dict["data"] = "";
            dict["abandoned"] = true;
            abandoned = true;
            dict["response"] = dict["response"] + "_CANONICAL_PAGE_EXISTS";
            dict["canonical_url"] = canonical_url;
        }

        //console.log(dict);
        try {
            var html_data = link_details.content;
        } catch (err) {
            var html_data = "";
        }
        try {
            delete dict['data']['_source']['html'];

        } catch (err) {

        }

        pool.mongodb_collection.updateOne({
            "_id": ObjectId(urlID)
        }, {
            $set: dict
        }, function updateDoc(err, results) {

            if (err) {
                if (err.code === 11000) {
                    if (err.errmsg.indexOf("$md5_1 dup key") >= 0) {

                        if (link_details.status_code === 404) {
                            bot_obj.updateStats("failedPages", 1);
                            msg("Duplicate page found but 404 thus skipping .", 'info');
                            dict["abandoned"] = true;
                            delete dict["crawled"];
                            delete dict["data"];
                            dict["response"] = "404_SAME_MD5_PAGE_EXISTS";
                            dict["md5"] = md5 + "md5#random#" + parseInt(Math.random() * 10000) + "" + parseInt(new Date().getTime());
                            pool.mongodb_collection.updateOne({
                                "_id": ObjectId(urlID)
                            }, {
                                $set: dict
                            }, function(err, results) {

                            });
                        } else {
                            msg("Duplicate page found", "info");
                            //a similar page exists
                            //then update this link into it and abondon this by setting a canonial link

                            pool.mongodb_collection.updateOne({
                                "md5": md5
                            }, {
                                "$push": {
                                    "alternate_urls": url
                                },
                                "$pushAll": {
                                    "alternate_urls_lang": alternate_urls
                                }
                            }, function(e, r) {
                                //  console.log("update canonical_url ",e,r," update canonical_url");


                            });
                            bot_obj.updateStats("failedPages", 1);
                            dict["abandoned"] = true;
                            delete dict["crawled"];
                            delete dict["data"];
                            dict["response"] = "SAME_MD5_PAGE_EXISTS";
                            dict["md5"] = md5 + "md5#random#" + parseInt(Math.random() * 10000) + "" + parseInt(new Date().getTime());
                            pool.mongodb_collection.updateOne({
                                "_id": ObjectId(urlID)
                            }, {
                                $set: dict
                            }, function(err, results) {

                            });
                        }

                    }
                } else {
                    msg("pool.setCrawled", "error");
                    console.log(err);
                }

            } else {
                if (!abandoned && dict["response"] !== "inTikaQueue") {
                    //console.log(html_data, typeof html_data);
                    if (typeof html_data !== 'string') {
                        //this is the case due to some errors
                        //in this case skip the page
                        html_data = "";
                    }


                    //var crypto = require('crypto');
                    //var md5sum = crypto.createHash('md5');
                    //md5sum.update(html_data);
                    //var hash = md5sum.digest('hex');
                    //console.log(ObjectId(urlID), "   ", hash);

                    if (html_data.length !== 0) {

                        var gridStore = new pool.GridStore(pool.db, ObjectId(urlID), ObjectId(urlID) + "_data.html", "w");
                        gridStore.open(function(err, gridStore) {
                            gridStore.write(html_data, function(err, gridStore) {
                                gridStore.close(function(err, fileData) {
                                    bot_obj.updateStats("crawledPages", 1);
                                    //console.log('hey1');
                                    elasticIndex(dict);
                                    msg(("Updated " + url), "success");
                                    
                                });
                            });
                        });

                    } else {
                        bot_obj.updateStats("crawledPages", 1);
                        //console.log('hey12');
                        elasticIndex(dict);
                        msg(("Updated " + url), "success");
                        
                    }

                }
                if (abandoned) {
                    bot_obj.updateStats("failedPages", 1);
                }


            }

        });
        return fn();
    };

    /**
        Indexes js Object into Elasticsearch.
        If elasticsearch enabled from config.
        @private
        @param {JSON} dict
    */
    function elasticIndex(dict){
        if(config.getConfig("elasticsearch_server")){

            delete dict["crawled"];
            var id = dict["data"]["_source"]["id"];
            delete dict["data"]["_source"]["id"];
            delete dict["data"]["_source"]["rss_feeds"];

            var response_time = dict["response_time"];
            var response = dict["response"];


            var url = dict["url"];
            dict["data"]["_source"]["url"] = url;

            var source = dict["data"]["_source"];
            source["response"] = response;
            source["response_time"] = response_time;

          
           var indexx = config.getConfig('elasticsearch','index');
            es_client.index({
              index: indexx,
              type: config.getConfig('elasticsearch','type'),
              body: source,
              id: id,
            }, function es_client_cb (error, response) {
                if(error){
                    console.log(error);
                }else{
                    msg("updated " + id + " into elasticsearch index '" + indexx + "'"  , "success");
                }
            });
        }
    };
    /**
        Kill the workers spawned by the child manager.
        @public
        @param {Function} fn - callback
    */
    this.killWorkers = function killWorker(fn) {
        /*
            Kill all the workers before clean up
        */
        pool.failed_db.update({
            "status": 1
        }, {
            "status": 0
        }, function() {

            pool.tika_queue.update({
                "status": 1
            }, {
                "status": 0
            }, function() {

                _.each(spawned, function(e, key) {
                    spawned[key].kill(); //kill the childs before exit
                });
                return fn();
            });
        });





    };

    function indexTikaDocs() {
        try {


            if (!message.get("tika_indexer_busy").enter()) {
                return;
            }

            pool.tika_f_queue.find({}, {
                limit: 100
            }).toArray(function(err, docs) {

                if (!check.assigned(err) && check.assigned(docs) && docs.length !== 0) {

                    var done = 0;

                    _.each(docs, function(doc) {

                        try {
                            var doc = docs[index];
                            (function(doc) {

                                fs.readFile(doc["content"], function(err, data) {
                                    //if file is not written yet to the cache race condition
                                    //console.log(arguments);
                                    if (!check.assigned(data) || check.assigned(err)) {
                                        ++done;
                                        if (done === docs.length) {
                                            message.get("tika_indexer_busy").release();
                                        }

                                        return;

                                    }


                                    try {
                                        var link_details = JSON.parse(data.toString());
                                    } catch (ee) {
                                        //console.log(ee);
                                        return pool.mongodb_collection.updateOne({
                                            "_id": ObjectId(doc["urlID"])
                                        }, {
                                            $set: {
                                                "response": "JSON_PARSE_ERROR",
                                                "abandoned": true,
                                                "crawled": false
                                            }
                                        }, function FailedTikaUpdateDoc(err, results) {
                                            pool.tika_f_queue.remove({
                                                _id: doc["_id"]
                                            }, function(e, d) {
                                                //console.log(e,d,2);
                                                fs.unlink(doc["content"], function tika_doc_indexer() {

                                                    msg("Tika doc parse error " + doc["urlID"], "error");
                                                    bot_obj.updateStats("failedPages", 1);
                                                    ++done;
                                                    if (done === docs.length) {
                                                        message.get("tika_indexer_busy").release();
                                                    }

                                                });


                                            });

                                        });
                                    }

                                    that.setCrawled(link_details, function() {
                                        pool.tika_f_queue.remove({
                                            _id: doc["_id"]
                                        }, function(e, d) {
                                            //console.log(e,d,2);
                                            fs.unlink(doc["content"], function tika_doc_indexer() {

                                                msg('Tika doc indexed', 'success');
                                                ++done;
                                                if (done === docs.length) {
                                                    message.get("tika_indexer_busy").release();
                                                    return;
                                                }
                                            });


                                        });

                                    });
                                });
                            })(doc);
                        } catch (ee) {
                            ++done;
                            if (done === docs.length) {
                                message.get("tika_indexer_busy").release();
                                return;
                            }
                        }

                    });
                } else {
                    message.get("tika_indexer_busy").release();
                    return;
                }






            });
        } catch (errr) {
            console.log(errr, "err");
            message.get("tika_indexer_busy").release();
        }
    }




    var pool_check_mode = setInterval(function() {
        if (!message.get('tika_setup') && message.get('begin_intervals')) {

            clearInterval(pool_check_mode); //once intervals are set clear the main interval

            if (!message.get('webappOnly')) {
                var tika_indexer = setInterval(indexTikaDocs, 1000);
                message.get('my_timers').push(tika_indexer);
            }
        }
    }, 5000);
    /**
        Used to call Logger object with the caller function name.
        @private
    */
    function msg() {
        log.put(arguments[0], arguments[1], __filename.split('/').pop(), arguments.callee.caller.name.toString());
    }


};









module.exports = ChildManager;