Source: lib/bucket.js

var ObjectId = require('mongodb').ObjectId;
var Immutable = require('immutable');
var parent_dir = process.getAbsolutePath(__dirname);
var Score = require(parent_dir + '/lib/score.js');
var proto = require(parent_dir + '/lib/proto.js');
var _ = require("underscore");
var check = require('check-types');
var URL = require(parent_dir + '/lib/url.js');
var proxy_cache_class = require(parent_dir + "/lib/bucket_proxy.js");

/**
    Represents a bucket of urls.
    Responsible for creation of buckets from inlinks and cached urls.
    @constructor
    @author Tilak Patidar<tilakpatidar@gmail.com>
    @param {Message} message_obj

*/
var Bucket = function(message_obj) {
    var message = message_obj;
    var config = message.get('config');
    var log = message.get('logger');
    var pool = message.get('pool');
    var that = this;

    /**
    	time after which bucket creator is called.
    	@private
    	@type {Number}
    */
    var bucket_time_interval = 10000;




    /**
    	Stores interval object for bucket creator.
    	@private
    	@type {boolean}
    */
    var bucket_timer;

    var domain_group = [];
    var bucket_priority = message.get('bucket_priority');
    var links_store = message.get('links_store');
    var score = new Score(message);
    var bot_obj = message.get('bot');


    /**
        Stores the number of remaining urls in each bucket
        @private
        @type {Object}
    */
    var remaining_urls_store = {};
    var cache_calls = {};

    this.dequeue = function bucketoperation_dequeue(domain, count, interval, bucket_id, fn) {

        var li = [];
        var remaining_urls = remaining_urls_store[bucket_id];
        
        if (!check.assigned(remaining_urls)) {
            remaining_urls_store[bucket_id] = 0;
            remaining_urls = 0;
        }

        if (!check.assigned(message.get('proxy_cache'))) {

            //first initilaization
            message.set('proxy_cache', new proxy_cache_class(config.getConfig('inlink_cache_size')));

        }
        var cache_urls = message.get('proxy_cache').fetchURLs(domain, count + remaining_urls);
        msg('Got ' + cache_urls.length + ' urls from links cache for ' + domain, "success");

        if (cache_urls.length < ((50 / 100) * count)) {

            //load some urls from the db
            //cache_calls is used to avoid reloading the second time of urls
            //we already have 5 times more urls from last call
            if (cache_calls[domain + bucket_id.toString()]) {
                //console.log("skip db second time");
                //console.log("4(count + remaining_urls) - cache_urls.length  ", count, "  +  ", remaining_urls, "  -  ",  cache_urls.length);
                remaining_urls = (count + remaining_urls) - cache_urls.length;
                remaining_urls_store[bucket_id] = remaining_urls;
                return fn(cache_urls);
            } else {
                cache_calls[domain + bucket_id.toString()] = true;
            }

            pool.mongodb_collection.find({
                "domain": domain,
                "bucket_id": null,
                "bucketed": false,
                "fetch_interval": interval,
                "partitionedBy": config.getConfig("bot_name")
            }, {
                limit: count * 5,
                sort: {
                    level: 1
                }
            }).toArray(function loadFromCache(err, object) {
                if (!check.assigned(err)) {
                    msg("Loaded " + object.length + ' items for the cache of ' + domain, 'success');
                }

                _.each(object, function(doc) {

                     message.get('proxy_cache').pushURL(doc);

                });

                //console.log("1(count + remaining_urls) - cache_urls.length  ", count, "  +  ", remaining_urls, "  -  ",  cache_urls.length);
                cache_urls = cache_urls.concat(message.get('proxy_cache').fetchURLs(domain, count + remaining_urls - cache_urls.length));
                //console.log(cache_urls, "FROM CACHE");
                //console.log("2(count + remaining_urls) - cache_urls.length  ", count, "  +  ", remaining_urls, "  -  ",  cache_urls.length);
                remaining_urls = (count + remaining_urls) - cache_urls.length;
                //console.log("short of ", remaining_urls);
                remaining_urls_store[bucket_id] = remaining_urls;
                fn(cache_urls);


            });
        } else {
            //console.log(cache_urls, "FROM CACHE2");
            cache_calls[domain + bucket_id.toString()] = true;
            //console.log("3(count + remaining_urls) - cache_urls.length  ", count, "  +  ", remaining_urls, "  -  ",  cache_urls.length);
            remaining_urls = (count + remaining_urls) - cache_urls.length;
            remaining_urls_store[bucket_id] = remaining_urls;
            fn(cache_urls);
        }




    };




    this.getCurrentDomain = function bucketoperation_getCurrentDomain(interval) {


        //console.log(that.bucket_priority)
        var domain = that.bucket_priority[interval].splice(0, 1)[0];


        return domain;
    };


    this.creator = function bucketoperation_creator() {

        //console.log("pinging")
        //#debug#console.log(li);
        //#debug#console.log(that.cache)

        //just pinging so that we do not run short of buckets
        //while we have links in our mongodb cache
        //generating new buckets based on refresh interval and uniformity
        if (!check.assigned(links_store)) {
            message.get('bucket_creator_lock').release();
            return;
        }
        //console.log("INININININ");
        var hashes = {};
        var intervals = config.getConfig("recrawl_intervals");

        _.each(message.get('distinct_fetch_intervals'), function(e, k) {
            //creating hashes for all declared distinct_fetch_intervals
            //console.log(k);
            hashes[k] = {};
            hashes[k]["_id"] = ObjectId();
            hashes[k]["links"] = [];

        });

        var n_domains = _.size(links_store);

        var interval_size = _.size(message.get('distinct_fetch_intervals'));
        var completed = 0;
        var done = 0;
        //console.log(that.domain_group,"that.domain_group");
        var domains = [];
        var tmp = message.get('domain_group');

        var sub_group = tmp.splice(0, 1)[0]; //dequeue


        tmp.push(sub_group); //enqueue
        message.set('domain_group', tmp);

        domains = sub_group["domains"];

        //one domain group at a time

        for (var i = 0; i < domains.length; i++) {
            //#debug#console.log(i)
            (function(dd, limit) {

                var ratio = parseInt(config.getConfig("batch_size") / 100);
                var eachh = ratio * dd["priority"];
                var key = dd["_id"];

                var rounds = 0;
                while (rounds !== 2) {
                    _.each(message.get('distinct_fetch_intervals'), function(e, k) {
                        //creating buckets for all distinct fetch intervals

                        (function(k) {



                            //#debug#console.log("EACHH "+eachh);
                            var pusher = that.pusher;
                            //console.log(key,eachh,k);
                            that.dequeue(key, eachh, k, hashes[k]["_id"], function(l) {
                                hashes[k]["domain_group_id"] = sub_group["_id"];
                                hashes[k]["links"] = hashes[k]["links"].concat(l);
                                ++done;
                                //console.log(hashes, "done ", done, limit);

                                if (done === limit) {
                                    remaining_urls_store = {};
                                    cache_urls = {};
                                    if (check.emptyObject(hashes)) {
                                        message.get('bucket_creator_lock').release();
                                        return;
                                    }
                                    pusher(hashes, function() {
                                        message.get('bucket_creator_lock').release();
                                    });


                                }
                            });


                        })(k);

                    });

                    rounds += 1;
                }

            })(domains[i], domains.length * Object.keys(message.get('distinct_fetch_intervals')).length * 2);

        };







        return;
    };


    this.pusher = function bucketoperation_pusher(hashes, fn) {
        //console.log(JSON.stringify(hashes,null,2));
        //#debug#console.log(that)
        try {
            hashes = score.getScore(hashes);
        } catch (err) {
            console.log(err);
            fn(true);
            return;
        }
        //console.log(hashes)

        //console.log("herere 2");
        if (!check.assigned(hashes)) {
            fn(false);
            return;
        }
        var done = 0;
        var counter = _.size(hashes);
        //console.log("hashes",hashes);
        _.each(hashes, function(e, key) {
            (function(key) {
                //first links are added to the db to avoid same links
                //console.log(hashes[key], "HASH KEYS");
                pool.addLinksToDB(hashes[key], key, function(numOfLinks) {
                    //uniform pool of urls are generated
                    //console.log("numOfLinks "+numOfLinks+" "+key);
                    if (!check.assigned(numOfLinks) || numOfLinks === 0) {
                        //fn(false);
                        ++done;
                        if (done === counter) {
                            fn(true);
                            return;
                        }
                        return;

                    }
                    var stamp1 = new Date().getTime();
                    var links_to_be_inserted = _.pluck(hashes[key]["links"], 'url');
                    var domain_id = hashes[key]["domain_group_id"];
                    pool.bucket_collection.insert({
                        "_id": hashes[key]["_id"],
                        "links": links_to_be_inserted,
                        "domains": domain_id,
                        "score": hashes[key]["score"],
                        "recrawlLabel": key,
                        "underProcess": false,
                        "insertedBy": config.getConfig("bot_name"),
                        "recrawlAt": stamp1,
                        "numOfLinks": numOfLinks
                    }, function bucketInsert(err, results) {
                        //console.log(arguments);
                        if (err) {
                            msg(("pool.addToPool" + err), "error");
                            //fn(false);
                            //return;
                        } else {
                            msg(("Updated bucket " + results["ops"][0]["_id"]), "success");
                            bot_obj.updateStats("createdBuckets", 1);
                            //fn(true);
                            //return;
                        }
                        ++done;
                        if (done === counter) {
                            fn(true);
                            return;
                        }


                    });
                });
            })(key);


        });


    };




    bucket_timer = setInterval(function() {
        if (!message.get('webappOnly') && message.get('bucket_creator_lock').enter()) {

            that.creator();
        }

    }, 10000);
    message.get('my_timers').push(bucket_timer);

    function msg() {
        if (!check.assigned(message.get('log'))) {
            console.log(arguments[0]);
            return;
        }
        message.get('log').put(arguments[0], arguments[1], __filename.split('/').pop(), arguments.callee.caller.name.toString());
    }


};


module.exports = Bucket;