Source: spawn.js

var proto = require(__dirname + "/lib/proto.js");
var URL = require(__dirname + "/lib/url.js");
var child = require('child_process');
process.getAbsolutePath = proto.getAbsolutePath;
var parent_dir = process.getAbsolutePath(__dirname);
var _ = require('underscore');

//using dnscache
//from now on all the calls made to dns module are wrapped by the cache
//this will provide dns cache in request module
var dns = require('dns'),
dnscache = require('dnscache')({
    "enable" : true,
    "ttl" : 300,
    "cachesize" : 1000
});

var request = require("request");
var check = require("check-types");
var colors = require('colors');
var urllib = require('url');
var crypto = require('crypto');
var Logger = require(__dirname + "/lib/logger.js");
var Message = require(__dirname + "/lib/message.js");
var Lock = require(__dirname + "/lib/lock.js");
/**
    Represents a spawned fetcher process.
    @constructor
    @author Tilak Patidar <tilakpatidar@gmail.com>

*/
var Spawn = function() {
    var message;
    var that = this;
    var log;
    /**
        Stores the fetch queue.
        @private
        @type {Array}

    */
    var GLOBAL_QUEUE = [];
    /**
        http request pool
        @private
    */
    var separateReqPool;
    var regex_urlfilter = {};
    var config = require(__dirname + "/lib/spawn_config.js");

    /**
        Global counter to store number of urls queued.
        @private
        @type {Number}
    */
    this.queued = 0;
    this.active_sockets = 0;
    this.batch = {};
    this.batchSize = 0;
    this.batchId = 0;
    this.refresh_label = null;
    this.links = [];
    this.botObjs = {};
    this.lastAccess = {};
    this.bot_type;
    this.domain_group_id = null;

    this.getTask = function getTask(fn) {
        process.on('message', function process_on_msg(data) {
            //recieving instructions from parent
            var k = data["init"];
            if (k) {
                that.batch = k[0];
                that.batchSize = k[1];
                that.links = k[2];
                that.botObjs = k[3];
                that.batchId = k[4];
                that.refresh_label = k[5];
                config = config.init(k[6], k[7], k[8]);
                message = new Message();
                message.set('config', config);
                message.set('links_store', that.links);
                that.bot_type = k[9];
                that.domain_group_id = k[10];
                log = new Logger(message);
                message.set('log', log);
                separateReqPool = {
                    maxSockets: config.getConfig("http", "max_sockets_per_host")
                };
                regex_urlfilter["accept"] = config.getConfig("accept_regex");
                regex_urlfilter["reject"] = config.getConfig("reject_regex");
                message.set('regex_urlfilter', regex_urlfilter);
                //prepare regexes
                URL = new URL(message);
                process.http_proxy = config.getConfig("http", "http_proxy");
                process.https_proxy = config.getConfig("http", "https_proxy");
                that.refresh_label = config.getConfig("default_recrawl_interval");
                return fn(that.batch);
            }
        });
    };

    this.queueLinks = function queueLinks(pools) {
        that.queued = 0;
        for (var i = 0; i < pools.length; i++) {
            if (check.assigned(pools)) {
                if (that.bot_type === "normal") {
                    var url = pools[i]['url'];
                    var domain = pools[i]['domain'];
                    try {
                        var link = URL.url(url, domain);
                        link.setNormalQueue();

                        link.setBucketId(pools[i]['bucket_id']);
                        link.setUrlId(pools[i]['_id']);
                        (function(link) {
                            setTimeout(function() {
                                that.processLink(link);
                            }, 100); //to avoid recursion
                        })(link);
                    } catch (err) {
                        console.log(err);
                    }
                } else if (that.bot_type === "failed_queue") {
                    var url = pools[i]['url'];
                    var domain = pools[i]['domain'];
                    var parent = pools[i]['parent'];
                    try {
                        var link = URL.url(url, domain, parent);
                        link.setFailedQueue();

                        link.setBucketId(pools[i]['bucket_id']);
                        link.setUrlId(pools[i]['_id']);
                        (function(link) {
                            setTimeout(function() {
                                that.processLink(link);
                            }, 100); //to avoid recursion
                        })(link);
                    } catch (err) {
                        console.log(err);
                    }
                }


            } else {
                break;
            }

        };

    };

    this.processLink = function processLink(link) {
        var bot = this; //inside setTimeout no global access
        //console.log(bot.batchId,"   ",link.details.url , 'ask access');
        //console.log(bot.batchId,"   ",bot.active_sockets, "    ",config.getConfig("http","max_concurrent_sockets"));
        if (!check.assigned(link)) {
            return;
        }
        if (that.active_sockets >= config.getConfig("http", "max_concurrent_sockets")) {
            //pooling to avoid socket hanging
            GLOBAL_QUEUE.push(link);
            return;

        }
        that.active_sockets += 1;


        if (check.assigned(that.botObjs)) {
            //if robots is enabled
            //check if access is given to the crawler for requested resource
            var robot = that.botObjs[link.details.domain];
            if (check.assigned(robot) && !robot["NO_ROBOTS"]) {
                robot = that.addProto(robot);
                robot.canFetch(config.getConfig("robot_agent"), link.details.url, function canFetch1(access, crawl_delay) {
                    if (!access) {
                        msg(("Cannot access " + link.details.url), "error");
                        // access not given exit 

                        try {
                            link.setStatusCode(403);
                            link.setResponseTime(0);
                            link.setParsed({});
                            link.setContent({});
                            process.send({
                                "bot": "spawn",
                                "setCrawled": link.details
                            });

                        } catch (err) {
                            //msg("Child killed","error")
                        } finally {
                            that.active_sockets -= 1;
                        }
                        return that.isLinksFetched();

                    } else {
                        //#debug#("access "+url+" crawl_delay "+crawl_delay);
                        that.scheduler(link, crawl_delay);

                    }
                });

            } else {
                //no robots file for asked domain
                that.fetch(link); //constraints are met let's fetch the page
            }

        } else {
            that.fetch(link); //constraints are met let's fetch the page
        }

    };

    this.fetch = function fetch(link) {
        if (!config.getConfig("verbose")) {
            msg(link.details.url, "no_verbose");
        }
        if (link.details.file_type === "file") {
            that.fetchFile(link);
        } else if (link.details.file_type === "webpage") {
            that.fetchWebPage(link);
        }


    };

    this.grabInlinks = function grabInlinks($, url, domain, linksFromParsers) {
        for (var i = 0; i < linksFromParsers.length; i++) {
            var q = linksFromParsers[i];
            try {
                process.send({
                    "bot": "spawn",
                    "addToPool": [q, q, url, config.getConfig("default_recrawl_interval")]
                });
            } catch (err) {
                //msg("Child killed","error")
            }
        };
        var a = $("a")
            //console.log(a);
        var count = a.length;
        url = URL.url(url, domain);

        a.each(function grabInlinks_each() {

            //do not follow links with rel = 'nofollow'
            var rel = $(this).attr('rel');
            if (check.assigned(rel) && rel === "nofollow") {
                --count;
                return;
            }

            var href = $(this).attr("href");
            //console.log(href);
            if (check.assigned(href)) {
                //#debug#("url "+href);

                //console.log(abs);
                var link = URL.url(href, domain);

                if (config.getConfig("web_graph")) {
                    try {
                        if (url.details.nutch_key.split(":")[0] !== link.details.nutch_key.split(":")[0]) {
                            //storing just outlink relations
                            process.send({
                                "bot": "spawn",
                                "graph": [link.details.url, url.details.url]
                            });
                        }

                    } catch (errr) {

                    }
                }
                if (!link.details.accepted) {
                    --count;
                    return;
                }
                //console.log(link.details.url);

                try {
                    process.send({
                        "bot": "spawn",
                        "addToPool": [link.details.url, link.details.domain, url.details.url, config.getConfig("default_recrawl_interval")]
                    });

                } catch (err) {
                    msg("Child killed", "error")
                }

            }

        });
        msg(("Got " + count + " links from " + url.details.url), "info");
    };

    this.isLinksFetched = function isLinksFetched() {
        that.queued += 1;
        if (that.queued >= that.batch.length) {
            try {
                process.send({
                    "bot": "spawn",
                    "finishedBatch": [that.batchId, that.refresh_label, that.bot_type]
                });
                setTimeout(function() {
                    process.exit(0);
                }, 5000);
            } catch (err) {
                //  msg("Child killed","error")
            }


        }

    };

    this.addProto = function addProto(robot) {
        robot.canFetch = function canFetch(user_agent, url, allowed) {
            var crawl_delay = parseInt(this.defaultEntry["crawl_delay"]) * 1000; //into milliseconds
            if (isNaN(crawl_delay) || !check.assigned(crawl_delay)) {

                crawl_delay = config.getConfig("http", "delay_request_same_host");
            }
            if (this.allowAll) {
                return allowed(true, crawl_delay);

            } else if (this.disallowAll) {
                return allowed(false, crawl_delay);

            }
            var rules = this.defaultEntry["rules"];
            if (!check.assigned(rules)) {
                return allowed(true, crawl_delay);
            }

            for (var i = 0; i < rules.length; i++) {
                var path = decodeURIComponent(rules[i].path);
                var isallowed = rules[i].allowance;

                var given_path = "/" + url.replace("http://", "").replace("https://", "").split("/").slice(1).join("/");
                if (given_path === path && isallowed) {
                    return allowed(true, crawl_delay);
                } else if (given_path === path && !isallowed) {
                    return allowed(false, crawl_delay);
                }
            };
            //if no match then simply allow
            return allowed(true, crawl_delay);
        };
        return robot;
    };

    this.scheduler = function scheduler(link, time) {
        if (time === 0) {
            //queue immediately
            return that.fetch(link);
        } else {
            var lastTime = that.lastAccess[link.details.domain];
            if (!check.assigned(lastTime)) {
                //first time visit,set time
                that.lastAccess[link.details.domain] = new Date().getTime();
                that.fetch(link);
            } else {
                that.queueWait(link, time);
            }
        }

    };

    this.queueWait = function queueWait(link, time) {
        var lastTime = that.lastAccess[link.details.domain];
        var current_time = new Date().getTime();
        if (current_time < (lastTime + time)) {
            that.lastAccess[link.details.domain] = current_time;
            that.fetch(link);
        } else {
            (function(link, time) {
                setTimeout(function() {
                    that.queueWait(link, time);
                }, Math.abs(current_time - (lastTime + time)));
            })(link, time);
        }
    };

    this.fetchWebPage = function fetchWebPage(link) {
        var req_url = link.details.url;
        //console.log(bot.links, link.details.domain);

        //console.log(req_url);
        var req = request({
            uri: req_url,
            followRedirect: true,
            pool: separateReqPool,
            timeout: config.getConfig("http", "timeout")
        });
        var html = [];
        var done_len = 0;
        var init_time = new Date().getTime();
        var sent = false;
        req.on("response", function req_on_response(res) {

            if (check.assigned(res) && check.assigned(res.headers.location) && res.headers.location !== req_url) {
                //if page is redirect
                msg(req_url + " redirected to " + res.headers.location, 'info');
                link.setRedirectedURL(res.headers.location);

            }
            if (check.assigned(res) && check.assigned(res.headers['content-type'])) {
                link.setHeaderContentType(res.headers['content-type']);
                var allowed = config.getConfig('http', 'accepted_mime_types');
                var tika_allowed = config.getConfig("tika_supported_mime");
                var match = false;
                var tika_match = false;

                _.each(allowed, function(e, index) {
                    if (res.headers['content-type'].indexOf(allowed[index]) >= 0) {
                        match = true;
                    }

                });


                _.each(tika_allowed, function(e, index) {
                    if (res.headers['content-type'].indexOf(tika_allowed[index]) >= 0) {
                        tika_match = true;
                    }
                });

                if (!match && !tika_match) {
                    req.emit('error', "MimeTypeRejected");
                }
                if (tika_match) {
                    msg("Tika mime type found transfer to tika queue " + link.details.url, 'info');
                    that.fetchFile(link);
                    req.emit('error', 'TikaMimeTypeFound');
                }

            }


            var len = parseInt(res.headers['content-length'], 10);
            if (!check.assigned(len) || !check.number(len)) {
                len = 0;
            }
            if (len > config.getConfig("http", "max_content_length")) {
                req.emit('error', "ContentOverflow");

            }
            res.on("data", function res_on_data(chunk) {
                done_len += chunk.length;
                var c = chunk.toString();
                //console.log(c,"c");
                html.push(c);
                var t = new Date().getTime();
                if ((t - init_time) > config.getConfig("http", "callback_timeout")) {
                    req.emit('error', "ETIMEDOUT_CALLBACK");
                }
                if (done_len > config.getConfig("http", "max_content_length")) {
                    req.emit('error', "ContentOverflow");
                }
            });
            res.on("error", function res_on_error(err) {
                //#debug#(err )
                //console.log(err,err.type)
                req.emit("error", err);

            });
            res.on("end", function res_on_end() {

                html = html.join("");
                //console.log(html);
                if (html.length === 0) {
                    //zero content recieved
                    //raise error bec otherwise it will create a md5 to which all the other empty urls will become canonical to
                    try {
                        link.setStatusCode(res.statusCode + "_EMPTY_RESPONSE");
                        link.setParsed({});
                        link.setResponseTime(0);
                        link.setContent({});
                        if (!sent) {
                            process.send({
                                "bot": "spawn",
                                "setCrawled": link.details
                            });

                        }
                    } catch (err) {
                        //  msg("Child killed","error")
                    } finally {
                        if (!sent) {
                            sent = true;
                            that.active_sockets -= 1;
                        }
                    }

                    return that.isLinksFetched();
                }
                var t = new Date().getTime();
                var response_time = t - init_time;
                if (!check.assigned(html)) {
                    //some error with the request return silently
                    msg("Max sockets reached read docs/maxSockets.txt", "error");
                    try {
                        link.setStatusCode(-1);
                        link.setParsed({});
                        link.setResponseTime(0);
                        link.setContent({});
                        if (!sent) {
                            process.send({
                                "bot": "spawn",
                                "setCrawled": link.details
                            });

                        }
                    } catch (err) {
                        //  msg("Child killed","error")
                    } finally {
                        if (!sent) {
                            sent = true;
                            that.active_sockets -= 1;
                        }
                    }

                    return that.isLinksFetched();
                }
                try {
                    var md5sum = crypto.createHash('md5');
                    md5sum.update(html);
                    var hash = md5sum.digest('hex');
                    link.setContent(html);
                    link.setContentMd5(hash);
                } catch (err_md) {

                }

                var Parser = require(__dirname + "/parsers/" + that.links[link.details.domain]["parseFile"]);
                var parser_obj = new Parser(config);
                parser_obj.parse(html, link.details.url, function(dic) {

                    //pluggable parser
                    var inlinksGrabbed = 1;
                    var parser_msgs = dic[4];
                    var default_opt = false;
                    var special_opt = false;
                    //check rss feeds
                    var feeds = dic[1]._source["rss_feeds"];
                    if (check.assigned(feeds) && feeds.length !== 0) {
                        try {
                            process.send({
                                "bot": "spawn",
                                "insertRssFeed": [link.details.url, feeds, that.domain_group_id]
                            });
                        } catch (err) {
                            console.log(err);
                        } finally {}


                    }
                    //check for author tag
                    if (check.assigned(dic[1]._source["author"])) {
                        try {
                            var d = {};
                            var author_link = URL.url(dic[1]._source["author"], link.details.domain);
                            d[author_link.details.url] = link.details.url;
                            process.send({
                                "bot": "spawn",
                                "insertAuthor": d
                            });
                        } finally {

                        }
                    }

                    //DEFAULTS
                    link.setStatusCode(res.statusCode);
                    link.setParsed(dic[1]);
                    link.setResponseTime(response_time);
                    link.setContent(html);
                    if (check.assigned(parser_msgs) && !check.emptyObject(parser_msgs)) {
                        //link.setStatusCode("META_BOT"); //bec going to concat status
                        _.each(parser_msgs, function(e, parser_msg_key) {

                            var parser_msg = parser_msgs[parser_msg_key];
                            //console.log("############## PARSER MSG ############",parser_msg_key,parser_msg);
                            switch (parser_msg_key) {
                                case "noindex":
                                    special_opt = true;
                                    try {
                                        link.setStatusCode(link.getStatusCode() + "_NOINDEX");
                                        link.setParsed({});
                                        link.setContent({});
                                        //no index but follow links from this page
                                        ++inlinksGrabbed;
                                    } catch (error) {

                                    }
                                    break;
                                case "nofollow":
                                    special_opt = true;
                                    try {
                                        link.setStatusCode(link.getStatusCode() + "_NOFOLLOW");
                                        inlinksGrabbed = -1000;
                                        //do not grab links from this page
                                    } catch (error) {

                                    }
                                    break;
                                case "canonical":
                                    special_opt = true;
                                    //console.log("############## HERE canonical################");
                                    try {
                                        var clink = URL.url(parser_msg);
                                        if (clink.details.url !== link.details.url) {
                                            //if canonical url and this url is not same
                                            link.setCanonicalUrl(parser_msg);
                                        }
                                        ++inlinksGrabbed;

                                        //do not grab links from this page
                                    } catch (error) {}
                                    break;
                                case "alternate":
                                    special_opt = true;
                                    _.each(parser_msg, function(e, ind) {
                                        link.addAlternateUrl(parser_msg[ind]);
                                    });

                                    ++inlinksGrabbed;
                                    //do not grab links from this page

                                    break;
                                case "content-type-reject":
                                    special_opt = true;

                                    link.setStatusCode("ContentTypeRejected");
                                    link.setParsed({});
                                    link.setContent({});
                                    //do not grab links from this page
                                    inlinksGrabbed = -1000;

                                    break;
                                case "content-lang-reject":
                                    special_opt = true;

                                    link.setStatusCode("ContentLangRejected");
                                    link.setParsed({});
                                    link.setContent({});
                                    //do not grab links from this page
                                    inlinksGrabbed = -1000;
                                    break;
                                case "none":
                                    special_opt = true;
                                    try {
                                        link.setStatusCode(link.getStatusCode() + "_NOFOLLOW_NOINDEX");
                                        link.setParsed({});
                                        link.setContent({});
                                        inlinksGrabbed = -1000;
                                    } catch (error) {

                                    }
                                    break;
                                default:
                                    default_opt = true;


                            };
                        });
                    }

                    if ((check.assigned(parser_msgs) && !check.emptyObject(parser_msgs)) && special_opt) {
                        //means one of the above cases met just send the response to setCrawled
                        if (inlinksGrabbed > 0) {
                            that.grabInlinks(dic[0], link.details.url, link.details.domain, dic[2]);
                        }

                        try {
                            if (!sent) {
                                process.send({
                                    "bot": "spawn",
                                    "setCrawled": link.details
                                });


                            }
                        } catch (err) {
                            //msg("Child killed","error")
                        } finally {
                            if (!sent) {
                                sent = true;
                                that.active_sockets -= 1;
                            }
                        }
                        return that.isLinksFetched();
                    } else {
                        //no msg recieved or no cases matched
                        //go for default send
                        //if((!check.assigned(parser_msgs) || check.emptyObject(parser_msgs)) || default_opt){  
                        //dic[0] is cheerio object
                        //dic[1] is dic to be inserted
                        //dic[2] inlinks suggested by custom parser
                        that.grabInlinks(dic[0], link.details.url, link.details.domain, dic[2]);

                        var code = res.statusCode;
                        //console.log(code,"code")
                        try {
                            //console.log("Coming here ",link.details.url);
                            link.setStatusCode(code);
                            link.setParsed(dic[1]);
                            link.setResponseTime(response_time);
                            link.setContent(html);
                            if (!sent) {
                                process.send({
                                    "bot": "spawn",
                                    "setCrawled": link.details
                                });


                            }
                        } catch (err) {
                            //msg("Child killed","error")
                        } finally {
                            if (!sent) {
                                sent = true;
                                that.active_sockets -= 1;
                            }

                        }


                        return that.isLinksFetched();
                        //} 
                    }

                });
            });
        });
        req.on("error", function req_on_error(err) {
            //#debug#(err)
            //console.log("req  ",err,err.type)
            var message = err;
            if (message === "ETIMEDOUT_CALLBACK") {
                msg("Connection timedout change http.callback_timeout setting in config", "error");
                try {
                    link.setStatusCode("ETIMEDOUT_CALLBACK");
                    link.setParsed({});
                    link.setResponseTime(0);
                    link.setContent({});
                    if (!sent) {
                        process.send({
                            "bot": "spawn",
                            "setCrawled": link.details
                        });
                    }
                } catch (err) {
                    //msg("Child killed","error")
                } finally {
                    if (!sent) {
                        sent = true;
                        that.active_sockets -= 1;
                    }
                }

                return that.isLinksFetched();
            } else if (message === "ContentOverflow") {
                msg("content-length is more than specified", "error");
                try {
                    link.setStatusCode("ContentOverflow");
                    link.setParsed({});
                    link.setResponseTime(0);
                    link.setContent({});
                    if (!sent) {
                        process.send({
                            "bot": "spawn",
                            "setCrawled": link.details
                        });

                    }
                } catch (err) {
                    //msg("Child killed","error")
                } finally {
                    if (!sent) {
                        sent = true;
                        that.active_sockets -= 1;
                    }
                }

                return that.isLinksFetched();

            } else if (message === "MimeTypeRejected") {
                msg("mime type rejected for " + link.details.url, "error");
                try {
                    link.setStatusCode("MimeTypeRejected");
                    link.setParsed({});
                    link.setResponseTime(0);
                    link.setContent({});
                    if (!sent) {
                        process.send({
                            "bot": "spawn",
                            "setCrawled": link.details
                        });

                    }
                } catch (err) {
                    //msg("Child killed","error")
                } finally {
                    if (!sent) {
                        sent = true;
                        that.active_sockets -= 1;
                    }
                }

                return that.isLinksFetched();

            } else if (message === "'TikaMimeTypeFound'") {
                //we already called fetch file just pass 
            } else {
                try {
                    var code;
                    if (err.code === 'ETIMEDOUT') {
                        msg(err, "error");
                        if (err.connect === true) {
                            code = 'ETIMEDOUT_CONNECTION'
                        } else {
                            code = 'ETIMEDOUT_READ'
                        }
                    } else {
                        code = err.code;
                    }
                    link.setStatusCode(code);
                    link.setParsed({});
                    link.setResponseTime(0);
                    link.setContent({});
                    if (!sent) {
                        process.send({
                            "bot": "spawn",
                            "setCrawled": link.details
                        });

                    }
                } catch (errr) {

                } finally {
                    if (!sent) {
                        sent = true;
                        //console.log(that.active_sockets, "before");
                        that.active_sockets -= 1;
                        //console.log(that.active_sockets, "after");
                    }
                }
                return that.isLinksFetched();
            }


        });


    };

    this.fetchFile = function fetchFile(link) {
        //files will be downloaded by seperate process
        //console.log("files    "+link.details.url)
        var p = that.links[link.details.domain]["parseFile"];
        code = "inTikaQueue";
        try {
            link.setStatusCode(code);
            link.setParsed({});
            link.setResponseTime(0);
            link.setContent({});
            process.send({
                "bot": "spawn",
                "setCrawled": link.details
            });
            var dict = {
                fileName: link.details.url,
                parseFile: p,
                status: 0,
                link_details: link.details
            };
            process.send({
                "bot": "spawn",
                "insertTikaQueue": dict
            });
        } catch (err) {
            console.log(err);
        } finally {
            that.active_sockets -= 1;
        }

        return that.isLinksFetched();


    };




    var global_queue_lock = new Lock();

    this.getTask(function getTask(links) {
        //console.log(links, "for fetch");
        that.queueLinks(links);

        setInterval(function global_queue_pusher() {

            if (!global_queue_lock.enter()) {
                return;
            }


            var len = GLOBAL_QUEUE.length;
            //console.log(len, "GLOBAL_QUEUE.length");

            for (var i = 0; i < len; i++) {
                that.processLink(GLOBAL_QUEUE.pop());
            };

            global_queue_lock.release();

            log.put(that.batchId + " <- bot id     " + that.queued + " <-  queued urls   " + that.batch.length + " <- batch length  " + that.active_sockets + " <- active_sockets", 'info');
        }, 5000);

    });

    function msg() {
        log.put(arguments[0], arguments[1], __filename.split('/').pop(), arguments.callee.caller.name.toString());
    }
};

var spawn_obj = new Spawn();