Source: index.js

#!/usr/bin/env node


var fs = require('fs');
var check = require('check-types');
var _ = require("underscore");
var proto = require(__dirname + '/lib/proto.js');
process.getAbsolutePath = proto.getAbsolutePath;
var parent_dir = __dirname; //parent dir path for importing modules safely



var SeedLoader = require(parent_dir + "/lib/seed-reloader.js");
var Message = require(parent_dir + '/lib/message.js');
var ConfigLoader = require(parent_dir + "/lib/config-reloader.js");
var Pool = require(__dirname + '/lib/pool');
var ChildManager = require(parent_dir + '/lib/child_manager.js')
var Logger = require(parent_dir + "/lib/logger.js");
var ArgumentProcesser = require(__dirname + '/lib/argv.js');
var Cluster = require(parent_dir + '/lib/cluster.js')
var Bot = require(parent_dir + '/lib/bot.js');
var Robots = require(__dirname + '/lib/robots.js');
var death = require("death");
var Lock = require(parent_dir + '/lib/lock.js');
var crawler_obj;


/**
 * Class responsible for loading and executing all the crawler components in proper sequence.
 * Responsiblities:
 *      Loads other classes and instantiate them
 *      Supply all the requirements of other classes
 *      Creates singleton objects which are shared along the program
 * @constructor
 * @param {Object} args - object containing cmd line args
 *	@author Tilak Patidar <tilakpatidar@gmail.com>
 */

var Crawler = function(args) {

    /**
    	Stores Cluster object 
    	@private
    	@type {Cluster}
    */
    var cluster;
    /**
		Stores Logger object;
		@private
		@type {Logger}
	*/
    var log;
    /**
		Stores Config object;
		@private
		@type {Config}
	*/
    var config;
    /**
		Stores Seed object;
		@private
		@type {SeedLoader}
	*/
    var seed;

    /**
		Stores MongoDB object;
		@private
		@type {MongoDB}
	*/
    var mongo_pool;
    /**
		Stores ChildManager object;
		@private
		@type {ChildManager}
	*/
    var child_manager;

    /**
		Stores Bot object;
		@private
		@type {Bot}
	*/
    var bot_obj;

    //boolean vars below are created to make use of setInterval and 
    //make async nested callbacks to appear sync for code clearity

    /**
		Set to true when db loaded.
		@private
		@type boolean
	*/
    var isDBLoaded = false;
    /**
		Set to true when cluster started.
		@private
		@type boolean
	*/
    var isClusterStarted = false;
    /**
		Set to true when inputs parsed.
		@private
		@type boolean
	*/
    var isInputsParsed = false;
    /**
		Set to true when normal crawl can continue.
		@private
	*/
    var isNormalCrawl = false;
    /**
		Set to true when logger loaded.
		@private
		@type boolean
	*/
    var isLoggerLoaded = false;
    /**
		Stores current obj context for nested functions.
		@private
		@type boolean
	*/
    var that = this;

    var JSONX = proto["JSONX"]; //JSON for regex support in .json files
    //constructor functions


    /**
		Loads depcheck.js and check dependencies.
		Exits if dependencies not met.
		@private
    */
    function checkDependency() {
        var Dependency = require(__dirname + "/lib/depcheck.js");
        var dep_obj = new Dependency();
        dep_obj.check();
    };


    /**
		Calls the seed method of MongoDb.
		And loads the ChildManager into child_manager
		@param {Object} botObjs - Robots.txt parsed data
		@private
    */
    function startBotManager(botObjs) {


        //function to start the child_manager
        seed.seed(function(completed) {
            if (completed) {
                //create a child manager

                child_manager = new ChildManager(message_obj);
                //#debug#console.log(process.child_manager,"child_manager")				
            }


        });

    }


    /**
		Calls cleanUp and kill all active_pids on death event. Ctrl^C
		@private
    */
    function deathCleanUp(fn) {
        //console.log("CAUGHT TERMINATION ",message_obj.get('caught_termination'));
        if (message_obj.get('caught_termination')) {
            return;
        }

        if (!check.assigned(log)) {
            log = {};
            msg = function(msg, color) {
                console.log(msg)
            };
            log.flush = function() {

            };
        }

        message_obj.set('caught_termination', true);
        msg('Termination request processing', 'info');
        //console.log(crawler_obj, "crawler_obj");
        crawler_obj.cleanUp(function(done) {
            if (done) {
                //console.log(done,"done")
                process.nextTick(function() {
                    var pids = fs.readFileSync(__dirname + "/db/pids/active_pids.txt").toString().split("\n");
                    for (var i = 0; i < pids.length; i++) {
                        try {
                            //#debug#console.log(parseInt(pids[i]))
                            process.kill(parseInt(pids[i]));
                        } catch (err) {
                            //#debug#console.log(err)
                        }

                    };
                    fs.unlinkSync(__dirname + "/db/pids/active_pids.txt");
                    if (process.RUN_ENV === "TEST") {
                        message_obj.set("bot_stopped", true);
                    } else {
                        process.exit(0);
                    }


                });
            }
        });
    }


    /**
		All the process global vars go here
		@private
    */

    function setGlobals() {

        //all the process related code here

        process.setMaxListeners(50);
        if (process.env.EDITOR === undefined) {
            process.env.EDITOR = "/bin/nano";
        }

    }


    /**
		Creates instance of MongoDB.
		Calls createConnection in MongodB and set the DB object 
		in Config, Seed. Marks isDBLoaded to true.

		@public
		@param {Pool} p - Pool object, returns constructor for MongoDB
	
    */
    this.loadDB = function loadDB(p) {

        var DBConnector = p.getDB(); //choosing db type
        mongo_pool = new DBConnector(message_obj);
        mongo_pool.createConnection(function() {
            message_obj.set('pool', mongo_pool);
            config.setDB();
            seed.setDB();
            isDBLoaded = true;
        });

    };

    /**
		Sets Config in our private var config,
		@public
		@param {Config} c
    */
    this.loadConfig = function loadConfig(c) {

        config = c; //globals for sharing config and seed file across various modules
    };

    /**
		Sets Seed in our private var seed,
		@public
		@param {SeedLoader} s
    */
    this.loadSeed = function loadSeed(s) {
        seed = s;
    };

    /**
		Starts the cluster by creating cluster and bot object.
		@public
    */
    this.startCluster = function startCluster() {

        var interval_locked = new Lock();

        var tmp_interval = setInterval(function() {
            //console.log("in startCluster");
            if (interval_locked.isLocked() || !isDBLoaded || !isLoggerLoaded) {
                return;
            }
            interval_locked.enter();
            //console.log("pass startCluster");


            cluster = new Cluster(message_obj);
            message_obj.set('cluster', cluster);
            bot_obj = new Bot(message_obj);
            message_obj.set('bot', bot_obj);
            mongo_pool.setBot();
            cluster.setBot();
            bot_obj.startBot(message_obj.get('force_mode'), function(status) {

                if (status) {
                    //bot was started successfully
                    isClusterStarted = true;

                } else {
                    //unable to start bot exit gracefully
                    message_obj.set('stop_bot_and_exit');
                }
                clearInterval(tmp_interval);
            });

        }, 1000);

    };

    this.isStopped = function() {
        return message_obj.get('bot_stopped');
    };
    /**
		Reset the bot when --reset arg passed
		@public
		@param {Function} fn - Callback function
    */
    this.reset = function reset(fn) {
        //drop the db				
        mongo_pool.drop(function reset_pool_drop() {
            msg("db reset", "success");
            msg("robots cache reset", "success");




            //drop pdf store
            var files = fs.readdirSync(__dirname + '/pdf-store/');
            for (var i = 0; i < files.length; i++) {
                if (files[i].indexOf(".") === 0) {
                    //do not take hidden files
                    continue;
                }
                var domain = files[i].replace(/##/g, "/");
                var data = fs.unlinkSync(__dirname + '/pdf-store/' + files[i]);
            };
            msg("pdf-store cache reset", "success");

            //drop pdf store-parsed
            var files = fs.readdirSync(__dirname + '/pdf-store-parsed/');
            for (var i = 0; i < files.length; i++) {
                if (files[i].indexOf(".") === 0) {
                    //do not take hidden files
                    continue;
                }
                var domain = files[i].replace(/##/g, "/");
                var data = fs.unlinkSync(__dirname + '/pdf-store-parsed/' + files[i]);
            };
            msg("pdf-store-parsed cache reset", "success");

            try {
                var stream = fs.createWriteStream(__dirname + "/config/db_config.json");
                stream.write("{}");
                stream.close();
                msg("Db config cleared", "success");
            } catch (ee) {
                msg("Db config not cleared not cleared", "error");
            }


            msg("crawler reset", "success");

            return fn();



        });

    };


    /**
		Exits the crawler by calling cleanUp
		@public
    */
    this.exit = function exit(fn) {

        that.cleanUp(function(done) {
            if (done) {
                if (process.RUN_ENV === "TEST") {
                    fn();
                } else {
                    fn();
                    process.exit(0);
                }

            }

        });
    }


    /**
        Returns if bot started successfully
        @public
        @return {boolean} status - status from messages
    */
    this.isStarted = function() {
        return message_obj.get('success_start');
    };


    /**
		Parses input and sets overriden config returned by ArgumentParser to Config object.
		@public
		@param {ArgumentProcesser} argv_obj
    */
    this.processInput = function(argv_obj) {
        var interval_locked = new Lock();
        var tmp_interval = setInterval(function() {
            //console.log("process input interval");
            if (interval_locked.isLocked() || !isClusterStarted || !isDBLoaded || !isLoggerLoaded) {
                return;
            }

            interval_locked.enter();

            clearInterval(tmp_interval);
            var new_opts = argv_obj.parse(); //executes the args passed and returns overriden config

            var overriden_config = new_opts; //parses cmd line argv and perform required operations
            config.setOverridenConfig(overriden_config);
            isInputsParsed = true;

        }, 1000);
    };


    /**
        Returns if bot stopped. 
        Default null. When stopped returns true.
        @public
        @return {boolean} status
    */
    this.botStopped = function() {

        return message_obj.get('bot_stopped');
    }

    /**
		When args is parsed this is called to select the action of crawler.
		@public
    */
    this.selectInput = function selectInput() {
        var interval_locked = new Lock();
        var tmp_interval = setInterval(function() {
            //console.log("selectInput interval");
            if (interval_locked.isLocked() || !isInputsParsed || !isDBLoaded || !isClusterStarted || !isLoggerLoaded) {
                return;
            }
            interval_locked.enter();
            clearInterval(tmp_interval);
            if (!message_obj.get('modifyConfig')) { //set to true by argv if --config is selected stops bot from starting if this option is selected

                config.pullConfig(function() {

                    mongo_pool.checkIfNewCrawl(function() {

                        //notify that bot started successfully
                        message_obj.set('success_start', true);


                        if (message_obj.get('editSeedFile')) {
                            seed.editSeedFile();
                        } else if (message_obj.get('removeSeed')) {
                            seed.removeSeed(Object.keys(message_obj.get('removeSeed'))[0]);
                        } else if (message_obj.get('seedFilePath')) {
                            seed.seedFile(message_obj.get('seedFilePath'), null, function() {
                                message_obj.set("stop_bot_and_exit");
                            });
                        } else if (message_obj.get('reset')) {
                            that.reset(function() {
                                message_obj.set('stop_bot_and_exit');
                            });
                        } else {
                            seed.pull(function() {
                                seed.readSeedFile(function readSeedFile() {
                                    isNormalCrawl = true;
                                });
                            });
                        }


                    });
                });

            }

        }, 1000);
    };

    /**
		When no special args are given this is called by this.selectInput
		@public
    */
    this.startNormalCrawl = function startNormalCrawl() {
        var interval_locked = new Lock();
        var tmp_interval = setInterval(function() {
            //console.log("start normal interval");
            if (interval_locked.isLocked() || !isInputsParsed || !isDBLoaded || !isClusterStarted || !isNormalCrawl || !isLoggerLoaded) {

                return;
            }

            interval_locked.enter();
            clearInterval(tmp_interval);
            var botObjs = {}; //will store robots.txt data for seed links

            if (config.getConfig("allow_robots") && !process.webappOnly) {

                /*	
                	if robots.txt has to be followed
                	we have to download all robots.txt files
                */

                msg("downloading robots.txt this could take a while", "info");



                message_obj.set('robots_links', Object.keys(message_obj.get('links_store')));

                var robots = new Robots(message_obj, config.getConfig("robots_parser_threads"));

                robots.parse(function robots_init(err, obj) {

                    if (obj) {
                        msg("robots.txt parsed", "success");
                    } else {
                        msg("robots.txt parsing failed", "error");
                    }
                    message_obj.set('botObjs', obj);
                    startBotManager();


                });
            } else {
                startBotManager();

            }

            if (!process.modifyConfig && !process.editSeedFile) {
                //to disable detection of Ctrl^X if nano editor is on

                (function(crawler_obj, message_obj, msg) {

                    death(deathCleanUp);

                })(this, message_obj, msg);


            }
        }, 1000);
    };


    /**
		Performs clean up operations before closing crawler.
		@public
		@param {Function} fn - Callback
    */
    this.cleanUp = function cleanUp(fn1) {
        msg("Performing cleanUp ", "info");
        bot_obj.stopBot(function() {
            try {
                process.kill(message_obj.get('tikaPID'), "SIGINT");
            } catch (err) {
                //console.log(err);
                //trying to kill the tika server jar
            }
            //console.log(1);
            if (!check.assigned(cluster.cluster_server)) {
                cluster.cluster_server = {}
            }
            //console.log(101);
            if (!check.assigned(cluster.file_server)) {
                cluster.file_server = {}
            }
            //console.log(102);
            if (!check.assigned(cluster.fileServer)) {
                cluster.fileServer = {}
            }
            //console.log(103);
            if (!check.assigned(cluster.cluster_server.shutdown)) {
                cluster.cluster_server.shutdown = function(fn) {
                    fn();
                };
            }
            //console.log(104);
            if (!check.assigned(cluster.file_server.shutdown)) {
                cluster.file_server.shutdown = function(fn) {
                    fn();
                };
            }
            //console.log(105);
            if (!check.assigned(cluster.fileServer.shutdown)) {
                cluster.fileServer.shutdown = function(fn) {
                    if (fn.constructor.name === 'Function') fn();
                };
            }
            // console.log(106);
            if (!check.assigned(child_manager)) {
                child_manager = {};
                child_manager.setManagerLocked = function(fn) {

                    if (fn.constructor.name === 'Function') fn();
                };
                child_manager.killWorkers = function(fn) {

                    if (fn.constructor.name === 'Function') fn();

                };
                child_manager.flushInlinks = function(fn) {
                    if (fn.constructor.name === 'Function') fn();
                };
            }

            //console.log(107);
            child_manager.setManagerLocked(true); //lock the manager so no new childs are spawned
            //#debug#console.log(cluster.cluster_server,cluster.file_server)
            child_manager.flushInlinks(function(status) {
                //console.log(108,child_manager);

                //flush all the inlinks into db before exit
                child_manager.killWorkers(function() {
                    //console.log(109);

                    //clear timers
                    var timers = message_obj.get('my_timers');
                    for (var i = 0; i < timers.length; i++) {
                        clearInterval(timers[i]);
                    };
                    //console.log(110);

                    cluster.cluster_server.shutdown(function() {
                        cluster.file_server.shutdown(function() {
                            cluster.fileServer.shutdown(function() {

                                //console.log(1111);


                                //clear all moduele references
                                //#debug#console.log(process.bot);
                                bot_obj.stopBot(function cleanUp_stopbot(err) {
                                    //if (err) throw err;

                                    msg("cleanUp done", "success");

                                    //flushing the log
                                    log.flush(function() {


                                        mongo_pool.close(function() {
                                            return fn1(true);
                                        });

                                    });
                                });
                            });





                        });


                    });
                });

            }); //kill all the workers before quiting
        })
    };


    /**
		Restarts the bot.
		@public
    */
    this.restart = function restart(fn) {
        //restart
        that.cleanUp(function(done) {
            if (done) {
                var spawn = require('child_process').spawn;
                var file_path = __dirname + '/index.js';
                var ls = spawn(config.getConfig("env"), [file_path], {
                    stdio: 'inherit'
                });
                fs.appendFileSync(__dirname + "/db/pids/active_pids.txt", ls.pid + "\n");
                //ls.stdout.pipe(process.stdout);
                //process.exit(0);			
                ls.on("exit", function() {
                    if (process.RUN_ENV === "TEST") {
                        message_obj.set("bot_stopped", true);
                    } else {
                        process.exit(0)
                    }

                });

            }



        });
    };
    /**
		Sets the Logger object in all Crawler components.
		@public
		@param {Logger} l
    */
    this.setLogger = function setLogger(l) {
        var interval_locked = new Lock();
        var tmp_interval = setInterval(function() {
            //console.log("logger interval");
            if (interval_locked.isLocked() || !isDBLoaded) {
                return;
            }

            log = l;

            message_obj.set('log', log);


            interval_locked.enter();

            clearInterval(tmp_interval);
            isLoggerLoaded = true;
        }, 1000);
    };


    checkDependency();

    /**
		Message object which is shared with all the crawler components.
		@private
		@type {Message}
    */
    var message_obj = new Message();
    message_obj.set('crawler', this);


    //some args need to be parsed before
    var argv = require('minimist')(args);
    if (check.assigned(argv["force"])) {
        message_obj.set('force_mode', true);
    }


    /**
    	Main method of the Crawler.
    	Executes the crawler by loading all components.
    	@public
    */
    this.run = function run() {



        fs.appendFileSync(__dirname + "/db/pids/active_pids.txt", process.pid + "\n");


        var config_obj = new ConfigLoader(message_obj);
        message_obj.set('config', config_obj);
        var log_obj = new Logger(message_obj);
        var seed_obj = new SeedLoader(message_obj);
        message_obj.set('seed', seed_obj);
        var pool_obj = new Pool(message_obj);
        message_obj.set('argv', argv);
        var argv_obj = new ArgumentProcesser(message_obj);


        that.loadConfig(config_obj);
        that.loadSeed(seed_obj);
        that.loadDB(pool_obj);

        that.setLogger(log_obj);
        that.startCluster();
        that.processInput(argv_obj);
        that.selectInput();
        that.startNormalCrawl();



    };


    /**
		Used to call Logger object with the caller function name.
		@private
	*/
    function msg() {
        log.put(arguments[0], arguments[1], __filename.split('/').pop(), arguments.callee.caller.name.toString());
    }

}



if (require.main === module) {
    crawler_obj = new Crawler(process.argv.slice(2));
    crawler_obj.run();

} else {

    module.exports = Crawler;

}