Source: tika.js

//https://wiki.apache.org/tika/TikaJAXRS
var proto = require(__dirname + '/lib/proto.js');
process.getAbsolutePath = proto.getAbsolutePath;
var exec = require('child_process').exec;
var URLClass = require(__dirname + "/lib/url.js");
var fs = require('fs');
var _ = require('underscore');
//using dnscache
//from now on all the calls made to dns module are wrapped by the cache
//this will provide dns cache in request module
var dns = require('dns'),
dnscache = require('dnscache')({
    "enable" : true,
    "ttl" : 300,
    "cachesize" : 1000
});


var request = require('request');
var Logger = require(__dirname + "/lib/logger.js");
var check = require("check-types");
var log;
var crypto = require('crypto');
var config = require(__dirname + "/lib/spawn_config.js");

var MongoClient = require('mongodb').MongoClient;
var Message = require(__dirname + '/lib/message.js');
var Lock = require(__dirname + '/lib/lock.js');
var message;


/**
    Represents a tika queue object.
    @constructor
    @author Tilak Patidar <tilakpatidar@gmail.com>
    @param {Message} message_obj

*/
var TikaQueue = function(message_obj) {

    var db = message_obj.get('mongodb_pool');
    var config = message_obj.get('config');
    var message = message_obj;

    var tika_queue = db.collection(config.getConfig("bot_name") + "_tika_queue");
    var tika_f_queue = db.collection(config.getConfig("bot_name") + "_tika_f_queue");
    //console.log('obj created');

    /**
        Dequeue tika jobs from db.
        @param {Function} fn - callback
        @param {number} num - no of jobs
        @public
    */
    this.dequeue = function(fn, num) {
        var done = 0;
        var li = [];
        tika_queue.find({
            "status": 0
        }, {
            limit: num
        }).toArray(function(err, docs) {
            if (check.assigned(err) || !check.assigned(docs)) {
                return fn([]);
            }

            if (done === docs.length) {
                return fn(null);
            }

            _.each(docs, function(doc, index) {
                (function(doc) {
                    tika_queue.update({
                        "_id": doc["_id"]
                    }, {
                        "$set": {
                            status: 1
                        }
                    }, function(e) {
                        //console.log(e);
                        if (check.assigned(doc) && check.assigned(doc.fileName) && check.assigned(doc.parseFile) && check.assigned(doc.status) && check.assigned(doc.link_details)) {
                            li.push(doc);
                        }
                        ++done;
                        if (done === docs.length) {
                            return fn(li);
                        }
                    });
                })(doc);
            });
        });

    };


    /**
        Remove a job from the queue
        @public
        @param {String} idd
    */
    this.remove = function(idd) {
        tika_queue.removeOne({
            _id: idd
        }, function() {

        });
    };


};


/**
    Tika wrapper class for processing and downloading documents.
    @constructor
    @param {Message} message_obj
    @author Tilak Patidar<tilakpatidar@gmail.com>

*/
var Tika = function(message_obj) {

    /**
        Tika queue object
        @type {TikaQueue}
        @private
    */
    var tika_queue_obj;

    /**
        Locks the processNext while current operations are still running.
        @type {Lock}
        @private

    */
    var tika_queue_lock = new Lock();
    var config = message_obj.get('config');
    var URL = new URLClass(message_obj);
    var message = message_obj;
    var tika_queue;
    var tika_f_queue;

    var that = this;
    var color_debug;

    var co = config.getConfig("tika_debug");
    if (co) {
        color_debug = "error";
    } else {
        color_debug = "no_verbose";
    }

    //set proxy 
    process.http_proxy = config.getConfig("http", "http_proxy");
    process.https_proxy = config.getConfig("http", "https_proxy");

    //clear pdf-store because of old corrupted downloads
    var files = fs.readdirSync(__dirname + '/pdf-store/');
    for (var i = 0; i < files.length; i++) {
        if (files[i].indexOf(".") === 0) {
            //do not take hidden files
            continue;
        }
        var data = fs.unlinkSync(__dirname + '/pdf-store/' + files[i]);
    };
    msg("pdf-store cache reset", "success");


    var serverOptions = {
        'auto_reconnect': true,
        'poolSize': config.getConfig("pool-size")
    };

    var mongodb = config.getConfig("mongodb", "mongodb_uri");


    var mongo = MongoClient.connect(mongodb, serverOptions, function(err, db1) {

        var db = db1;
        message.set("mongodb_pool", db);
        //#debug#console.log(err,db)
        var tika_queue = db.collection(config.getConfig("bot_name") + "_tika_queue");
        tika_queue.update({
            "status": 1
        }, {
            "status": 0
        }, function revert_queue() {
            msg("pdf-store queue reverted", "success");
            tika_queue = db.collection(config.getConfig("bot_name") + "_tika_queue");
            tika_f_queue = db.collection(config.getConfig("bot_name") + "_tika_f_queue");
            that.startServer();
            tika_queue_obj = new TikaQueue(message_obj);


        });

    });

    /**
        Starts tika-server.jar as spawned process.
        @public
    */
    this.startServer = function startServer() {
        //first kill an old instance of tika if exists
        var pid = "";
        try {
            pid = fs.readFileSync(__dirname + "/db/pids/tikaPID.txt").toString();
            msg("Trying to kill an old instance of tika if active", "info");
        } catch (err) {
            //if file not exists
            //touch file if not exists
            var stream = fs.createWriteStream(__dirname + "/db/pids/tikaPID.txt");
            stream.write("");
            stream.end();
        }


        try {
            if (pid !== "") {
                process.kill(parseInt(pid));
            }

        } catch (err) {
            if (err.code === "ESRCH") {
                //cannot reach the process by the pid
                //maybe process is already killed
                msg("Old tika instance was already killed", "info");
                msg("Reset tika pid file", "info");
                var stream = fs.createWriteStream(__dirname + "/db/pids/tikaPID.txt");
                stream.write("");
                stream.end();
            } else {
                msg(err.stack, color_debug, err.type);
            }

        }
        var d = exec('java -jar ' + __dirname + '/lib/tika-server-1.11.jar -h ' + config.getConfig("tika_host"), function tika_jar_exec(error, stdout, stderr) {
            msg("[SUCCESS] Tika server started", "success");
            if (check.assigned(error)) {
                msg(error.stack, color_debug);
                msg('Server is already running', "info");
            }
        });
        try {
            process.send({
                "tikaPID": d.pid
            });
        } catch (e) {
            msg(e.stack, color_debug);
        }

    };

    /**
        Submit a file for download and parsing.
        @param {String} url
        @param {Function} callback
        @public
    */
    this.submitFile = function submitFile(url, callback) {
        var err;
        //main function of the module
        //console.log("1", url);
        that.addFileToStore(url, function addFileToStore1(err1) {

            if (err1) {
                if (err1 === "error") {
                    err = "tikaDownloadFailed";
                } else {
                    err = err1;
                }

                return callback(err, null);
            }
            msg(("[SUCCESS] File " + url + " added to store"), "success");
            that.extractText(url, function extractText1(err2, body) {
                //console.log(err2);
                if (err2) {
                    err = "tikaExtractFailed";
                    return callback(err, null);
                }
                that.removeFile(url, function removeFile1(err3) {
                    //console.log(err3);
                    if (err3) {
                        err = "tikaRemoveDownloadedFailed";
                        return callback(err, null);
                    }
                    msg(("[SUCCESS] File " + url + " removed from store"), "success");
                    callback(err, body);
                });
            })

        });



    };


    /**
        To start download a file.
        @param {String} url
        @param {Function} callback
        @public
    */
    this.addFileToStore = function addFileToStore(url, callback) {
        //console.log("2");
        //console.log(url, 'addFileToStore');
        var st = fs.createWriteStream(that.getFileName(url)).on('error', function fsstream_addfilestore(err) {
            msg(err.stack, color_debug);
            return callback("TikeFileStreamError");
        });
        //console.log(url,"tika");
        var separateReqPool = {
            maxSockets: config.getConfig("tika_max_sockets_per_host")
        };

        var req = request({
            uri: url,
            pool: separateReqPool,
            headers: config.getConfig("tika_headers")
        });
        var done_len = 0;
        var init_time = new Date().getTime();
        req.on("response", function res_on_response(res) {
            var len = parseInt(res.headers['content-length'], 10);
            if (!check.assigned(len) || !check.number(len)) {
                len = 0;
            }
            if (len > config.getConfig("tika_content_length")) {
                msg("content-length is more than specified", "error");
                res.emit('error', "TikaContentOverflow");
            }
            res.on("data", function res_on_data(chunk) {
                done_len += chunk.length;
                var t = new Date().getTime();
                if ((t - init_time) > config.getConfig("tika_timeout")) {
                    //console.log((t-init_time)+"ContentTimeOut");
                    msg("Connection timedout change tika_timeout setting in config", "error");
                    res.emit('error', "TikaContentTimeout");
                }
                if (done_len > config.getConfig("tika_content_length")) {
                    //console.log(done_len+"ContentOverflowTka");
                    msg("content-length is more than specified", "error");
                    res.emit('error', "TikaContentOverflow");
                }
            });
            res.on('error', function res_on_error(err) {
                var msg = err;
                if (msg === "TikaContentOverflow" || msg === "TikaContentTimeout") {
                    return callback(err);
                } else {
                    msg(err.stack, color_debug);
                    return callback("TikaDownloadFailed");
                }



            }).pipe(st).on('error', function res_pipe_on_error(err) {
                msg(err.stack, color_debug);
                return callback("TikaFileStoreWriteError");
            }).on('close', function res_on_close(err) {
                if (!err) {
                    return callback(null);
                } else {
                    msg(err.stack, color_debug);
                    return callback(err);
                }

            });
        });




    };


    /**
        Removes a file from pdf-store
        @param {String} url
        @param {Function} callback
        @public
    */
    this.removeFile = function removeFile(url, cal) {

        fs.unlink(that.getFileName(url), function(err) {

            if (err) {
                cal("TikaStoreRemoveError");
            } else {
                cal(null);
            }
        });



    };


    /**
        Extracts text from downloaded document in pdf-store. Using tika-server.jar api.
        @param {String} url
        @param {Function} callback
        @public

    */
    this.extractText = function extractText(url, callback) {
        var errr;
        try {
            var source = fs.createReadStream(that.getFileName(url));
            source.on('error', function source_on_error1(err) {
                msg(err.stack, color_debug);
                callback("TikaFileStoreReadError", {});
            });
            var dic = {};
            source.pipe(request.put({
                url: 'http://' + config.getConfig("tika_host") + ':' + config.getConfig('tika_port') + '/tika',
                headers: {
                    'Accept': 'text/plain'
                }
            }, function(err, httpResponse, body) {
                //console.log(body)
                //for testing 
                dic["text"] = body + "" + parseInt(Math.random() * 1000000) + "" + new Date().getTime();
                //dic["text"] = body;
                source = fs.createReadStream(that.getFileName(url)).on('error', function source_create(err) {
                    msg(err.stack, color_debug);
                    callback("TikaFileStoreReadError", {});
                });

                //console.log('http://' + config.getConfig("tika_host") + ':' + config.getConfig('tika_port') + '/meta');
                source.pipe(request.put({
                    url: 'http://' + config.getConfig("tika_host") + ':' + config.getConfig('tika_port') + '/meta',
                    headers: {
                        'Accept': 'application/json'
                    }
                }, function source_response(err1, httpResponse1, body1) {
                    var err = null;
                    try {
                        msg("tika.extractText for " + url, "success");
                        //unexpected end of input error here check it please
                        //console.log(body1)
                        dic["meta"] = JSON.parse(body1);
                        callback(err, dic);
                    } catch (err) {
                        msg(err.stack, color_debug);
                        err = "TikaServerResponseError";
                        msg("tika.extractText for " + url, "error");
                        callback(err, dic);
                    }

                }));
            })).on('error', function source_on_error(err) {
                msg(err.stack, color_debug);
                callback("TikaServerResponseError", {});
            });
        } catch (err) {
            msg(err.stack, color_debug);
            callback("TikaExtractFailed", {});
        }



    };


    /**

        Dumps the job file for indexing.
        @param {String} url
        @public

    */
    this.indexTikaDoc = function(link) {
        var filename = that.getParsedFileName(link.details.urlID);
        tika_f_queue.insert({
            "content": filename,
            "urlID": link.details.urlID
        }, function() {
            var stream = fs.createWriteStream(filename);
            stream.write(JSON.stringify(link.details));
            stream.on("end", function tika_doc_index() {
                msg('Tika doc dumped for indexing', 'info');
            });
        });
    };


    /**
        Dequeues jobs from tika queue and process them. Runs in a setInterval.
        @private
    */
    var processNext = function processNext() {
        //console.log("here processNext");
        if (!tika_queue_lock.enter()) {
            return;
        }



        try {

            tika_queue_obj.dequeue(function tika_dequeue(li) {

                if (!check.assigned(li)) {
                    tika_queue_lock.release();
                    return;
                }
                if (check.assigned(li) && li.length === 0) {
                    tika_queue_lock.release();
                    return;
                }

                var done = 0;
                _.each(li, function(obj, i) {
                    (function(fileName, parseFile, uniqueId, link_details) {

                        try {
                            that.submitFile(fileName, function tika_submit_file(err, body) {
                                //console.log(err);
                                //console.log(body);
                                if (err) {
                                    msg("error from fetchFile for " + fileName, "error");
                                    try {
                                        var link = URL.url(fileName);
                                        link.setUrlId(link_details.urlID);
                                        link.setStatusCode(err);
                                        link.setParsed({});
                                        link.setResponseTime(0);
                                        link.setContent({});
                                        that.indexTikaDoc(link);
                                    } catch (errr) {
                                        msg(errr.stack, color_debug);
                                    }

                                } else {
                                    //console.log(body);
                                    var Parser = require(__dirname + "/parsers/" + parseFile);
                                    var parser_obj = new Parser(config);
                                    parser_obj.parse(body, fileName, function(dic) {


                                        //pluggable parser
                                        msg("fetchFile for " + fileName, "success");
                                        try {
                                            var link = URL.url(fileName);
                                            link.setUrlId(link_details.urlID);
                                            link.setStatusCode(200);
                                            link.setParsed(dic[1]);
                                            link.setResponseTime(0);
                                            link.setContent(dic[3]);
                                            if (check.assigned(body) && check.assigned(body["text"])) {
                                                var md5sum = crypto.createHash('md5');
                                                md5sum.update(body["text"]);
                                                var hash = md5sum.digest('hex');
                                                link.setContentMd5(hash);
                                            }

                                            that.indexTikaDoc(link);
                                        } catch (e) {
                                            msg(e.stack, color_debug);
                                        }
                                    });

                                }
                                ++done;
                                if (done === li.length) {
                                    msg("Tika batch completed", 'success');
                                    tika_queue_lock.release();
                                }

                                tika_queue_obj.remove(uniqueId);


                            });
                        } catch (err) {
                            //console.log(err, "IN error block");
                            msg("error from fetchFile for " + fileName, "error");
                            try {
                                var link = URL.url(fileName);
                                link.setStatusCode("tikaUnknownError");
                                link.setUrlId(link_details.urlID);
                                link.setParsed({});
                                link.setResponseTime(0);
                                link.setContent({});
                                that.indexTikaDoc(link);


                            } catch (e) {
                                msg(e.stack, color_debug);
                            } finally {
                                ++done;
                                if (done === li.length) {
                                    msg("Tika batch completed", 'success');
                                    tika_queue_lock.release();
                                }

                                tika_queue_obj.remove(uniqueId);
                            }


                        }


                    })(obj.fileName, obj.parseFile, obj.link_details.urlID, obj.link_details);
                });

            }, config.getConfig("tika_batch_size")); //[[],[]]
        } catch (e) {

            console.log(e, "error");
            tika_queue_lock.release();
        }









    };

    /**
        Converts url into pdf-store file location.
        @param {String} url

    */
    this.getFileName = function getFileName(url) {
        return __dirname + "/pdf-store/" + url.replace(/\//gi, "##");
    };

    /**
        Converts url into pdf-store-parsed file location.
        @param {String} url

    */
    this.getParsedFileName = function getParsedFileName(url) {
        return __dirname + "/pdf-store-parsed/" + url.replace(/\//gi, "##") + ".json";
    };


    /**
        Runs in setInterval if processNext is locked from long time. Then recovers the lock.
        @private
    */
    function failSafe() {

        if (tika_queue_lock.getLastLockTime() == null) {
            //has'nt been locked yet
            return;
        }
        if ((new Date().getTime() - tika_queue_lock.getLastLockTime()) >= (1000 * 60 * 10)) { //10 min check
            msg("Unlocking tika queue", 'info');
            tika_queue_lock.release();

        }

    }


    setInterval(processNext, 1000);
    setInterval(failSafe, 1000);

};









module.exports = Tika;


if (require.main === module) {

    /**

        Initializes message obj. Gets job details.

    */
    process.on("message", function process_on_msg(data) {
        //console.log(data);
        var key = Object.keys(data)[0];

        if (key === "init") {
            message = new Message();
            //making init ready
            var o = data[key];
            //console.log(o);
            config = config.init(o[0], o[1], o[2]);
            regex_urlfilter = {};
            regex_urlfilter["accept"] = config.getConfig("accept_regex");
            regex_urlfilter["reject"] = config.getConfig("reject_regex");
            message.set('config', config);
            message.set('regex_urlfilter', regex_urlfilter);
            message.set('links_store', o[3]);
            log = new Logger(message);
            message.set('log', log);

            tika_obj = new Tika(message);

        }


    });



}

function msg() {
    log.put(arguments[0], arguments[1], __filename.split('/').pop(), arguments.callee.caller.name.toString());
}