SqliteJobQueue.java

package com.birbit.android.jobqueue.persistentQueue.sqlite;

import com.birbit.android.jobqueue.Constraint;
import com.birbit.android.jobqueue.Job;
import com.birbit.android.jobqueue.JobHolder;
import com.birbit.android.jobqueue.JobManager;
import com.birbit.android.jobqueue.JobQueue;
import com.birbit.android.jobqueue.Params;
import com.birbit.android.jobqueue.config.Configuration;
import com.birbit.android.jobqueue.log.JqLog;

import android.database.Cursor;
import android.database.sqlite.SQLiteDatabase;
import android.database.sqlite.SQLiteDoneException;
import android.database.sqlite.SQLiteStatement;
import android.support.annotation.NonNull;
import android.support.annotation.VisibleForTesting;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 * Persistent Job Queue that keeps its data in an sqlite database.
 */
public class SqliteJobQueue implements JobQueue {
    @SuppressWarnings("FieldCanBeLocal")
    private DbOpenHelper dbOpenHelper;
    private final long sessionId;
    private SQLiteDatabase db;
    private SqlHelper sqlHelper;
    private JobSerializer jobSerializer;
    // we keep a list of cancelled jobs in memory not to return them in subsequent find by tag
    // queries. Set is cleaned when item is removed
    private Set<String> pendingCancelations = new HashSet<>();
    private FileStorage jobStorage;
    private final StringBuilder reusedStringBuilder = new StringBuilder();
    private final WhereQueryCache whereQueryCache;

    public SqliteJobQueue(Configuration configuration, long sessionId, JobSerializer serializer) {
        this.sessionId = sessionId;
        jobStorage = new FileStorage(configuration.getAppContext(), "jobs_" + configuration.getId());
        whereQueryCache = new WhereQueryCache(sessionId);
        dbOpenHelper = new DbOpenHelper(configuration.getAppContext(),
                configuration.isInTestMode() ? null : ("db_" + configuration.getId()));
        db = dbOpenHelper.getWritableDatabase();
        sqlHelper = new SqlHelper(db, DbOpenHelper.JOB_HOLDER_TABLE_NAME,
                DbOpenHelper.ID_COLUMN.columnName, DbOpenHelper.COLUMN_COUNT,
                DbOpenHelper.JOB_TAGS_TABLE_NAME, DbOpenHelper.TAGS_COLUMN_COUNT, sessionId);
        this.jobSerializer = serializer;
        if (configuration.resetDelaysOnRestart()) {
            sqlHelper.resetDelayTimesTo(JobManager.NOT_DELAYED_JOB_DELAY);
        }
        cleanupFiles();
    }

    private void cleanupFiles() {
        Cursor cursor = db.rawQuery(sqlHelper.LOAD_ALL_IDS_QUERY, null);
        Set<String> jobIds = new HashSet<>();
        try {
            while (cursor.moveToNext()) {
                jobIds.add(cursor.getString(0));
            }
        } finally {
            cursor.close();
        }
        jobStorage.truncateExcept(jobIds);
    }

    @VisibleForTesting
    public SQLiteDatabase getDb() {
        return db;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean insert(@NonNull JobHolder jobHolder) {
        persistJobToDisk(jobHolder);
        if (jobHolder.hasTags()) {
            return insertWithTags(jobHolder);
        }
        final SQLiteStatement stmt = sqlHelper.getInsertStatement();
        stmt.clearBindings();
        bindValues(stmt, jobHolder);
        long insertId = stmt.executeInsert();
        // insert id is a alias to row_id
        jobHolder.setInsertionOrder(insertId);
        return insertId != -1;
    }

    private void persistJobToDisk(@NonNull JobHolder jobHolder) {
        try {
            jobStorage.save(jobHolder.getId(), jobSerializer.serialize(jobHolder.getJob()));
        } catch (IOException e) {
            throw new RuntimeException("cannot save job to disk", e);
        }
    }

    @Override
    public void substitute(@NonNull JobHolder newJob, @NonNull JobHolder oldJob) {
        db.beginTransaction();
        try {
            remove(oldJob);
            insert(newJob);
            db.setTransactionSuccessful();
        } finally {
            db.endTransaction();
        }
    }

    private boolean insertWithTags(JobHolder jobHolder) {
        final SQLiteStatement stmt = sqlHelper.getInsertStatement();
        final SQLiteStatement tagsStmt = sqlHelper.getInsertTagsStatement();
        db.beginTransaction();
        try {
            stmt.clearBindings();
            bindValues(stmt, jobHolder);
            boolean insertResult = stmt.executeInsert() != -1;
            if (!insertResult) {
                return false;
            }
            for (String tag : jobHolder.getTags()) {
                tagsStmt.clearBindings();
                bindTag(tagsStmt, jobHolder.getId(), tag);
                tagsStmt.executeInsert();
            }
            db.setTransactionSuccessful();
            return true;
        } catch (Throwable t) {
            JqLog.e(t, "error while inserting job with tags");
            return false;
        }
        finally {
            db.endTransaction();
        }
    }

    private void bindTag(SQLiteStatement stmt, String jobId, String tag) {
        stmt.bindString(DbOpenHelper.TAGS_JOB_ID_COLUMN.columnIndex + 1, jobId);
        stmt.bindString(DbOpenHelper.TAGS_NAME_COLUMN.columnIndex + 1, tag);
    }

    private void bindValues(SQLiteStatement stmt, JobHolder jobHolder) {
        if (jobHolder.getInsertionOrder() != null) {
            stmt.bindLong(DbOpenHelper.INSERTION_ORDER_COLUMN.columnIndex + 1, jobHolder.getInsertionOrder());
        }

        stmt.bindString(DbOpenHelper.ID_COLUMN.columnIndex + 1, jobHolder.getId());
        stmt.bindLong(DbOpenHelper.PRIORITY_COLUMN.columnIndex + 1, jobHolder.getPriority());
        if(jobHolder.getGroupId() != null) {
            stmt.bindString(DbOpenHelper.GROUP_ID_COLUMN.columnIndex + 1, jobHolder.getGroupId());
        }
        stmt.bindLong(DbOpenHelper.RUN_COUNT_COLUMN.columnIndex + 1, jobHolder.getRunCount());
        stmt.bindLong(DbOpenHelper.CREATED_NS_COLUMN.columnIndex + 1, jobHolder.getCreatedNs());
        stmt.bindLong(DbOpenHelper.DELAY_UNTIL_NS_COLUMN.columnIndex + 1, jobHolder.getDelayUntilNs());
        stmt.bindLong(DbOpenHelper.RUNNING_SESSION_ID_COLUMN.columnIndex + 1, jobHolder.getRunningSessionId());
        stmt.bindLong(DbOpenHelper.REQUIRED_NETWORK_TYPE_OLUMN.columnIndex + 1,
                jobHolder.getRequiredNetworkType());
        stmt.bindLong(DbOpenHelper.DEADLINE_COLUMN.columnIndex + 1,
                jobHolder.getDeadlineNs());
        stmt.bindLong(DbOpenHelper.CANCEL_ON_DEADLINE_COLUMN.columnIndex + 1,
                jobHolder.shouldCancelOnDeadline() ? 1 : 0);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean insertOrReplace(@NonNull JobHolder jobHolder) {
        if (jobHolder.getInsertionOrder() == null) {
            return insert(jobHolder);
        }
        persistJobToDisk(jobHolder);
        jobHolder.setRunningSessionId(JobManager.NOT_RUNNING_SESSION_ID);
        SQLiteStatement stmt = sqlHelper.getInsertOrReplaceStatement();
        stmt.clearBindings();
        bindValues(stmt, jobHolder);
        boolean result = stmt.executeInsert() != -1;
        JqLog.d("reinsert job result %s", result);
        return result;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void remove(@NonNull JobHolder jobHolder) {
        delete(jobHolder.getId());
    }

    private void delete(String id) {
        pendingCancelations.remove(id);
        db.beginTransaction();
        try {
            SQLiteStatement stmt = sqlHelper.getDeleteStatement();
            stmt.clearBindings();
            stmt.bindString(1, id);
            stmt.execute();
            SQLiteStatement deleteTagsStmt = sqlHelper.getDeleteJobTagsStatement();
            deleteTagsStmt.bindString(1, id);
            deleteTagsStmt.execute();
            db.setTransactionSuccessful();
            jobStorage.delete(id);
        } finally {
            db.endTransaction();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public int count() {
        SQLiteStatement stmt = sqlHelper.getCountStatement();
        stmt.clearBindings();
        stmt.bindLong(1, sessionId);
        return (int) stmt.simpleQueryForLong();
    }

    @Override
    public int countReadyJobs(@NonNull Constraint constraint) {
        final Where where = createWhere(constraint);
        final long result = where.countReady(db, reusedStringBuilder).simpleQueryForLong();
        return (int) result;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public JobHolder findJobById(@NonNull String id) {
        Cursor cursor = db.rawQuery(sqlHelper.FIND_BY_ID_QUERY, new String[]{id});
        try {
            if(!cursor.moveToFirst()) {
                return null;
            }
            return createJobHolderFromCursor(cursor);
        } catch (InvalidJobException e) {
            JqLog.e(e, "invalid job on findJobById");
            return null;
        } finally {
            cursor.close();
        }
    }

    @NonNull
    @Override
    public Set<JobHolder> findJobs(@NonNull Constraint constraint) {
        final Where where = createWhere(constraint);
        String selectQuery = where.findJobs(sqlHelper);
        Cursor cursor = db.rawQuery(selectQuery, where.args);
        Set<JobHolder> jobs = new HashSet<>();
        try {
            while (cursor.moveToNext()) {
                jobs.add(createJobHolderFromCursor(cursor));
            }
        } catch (InvalidJobException e) {
            JqLog.e(e, "invalid job found by tags.");
        } finally {
            cursor.close();
        }

        return jobs;
    }

    @Override
    public void onJobCancelled(JobHolder holder) {
        pendingCancelations.add(holder.getId());
        setSessionIdOnJob(holder);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public JobHolder nextJobAndIncRunCount(@NonNull Constraint constraint) {
        final Where where = createWhere(constraint);
        //we can even keep these prepared but not sure the cost of them in db layer
        final String selectQuery = where.nextJob(sqlHelper);
        while (true) {
            Cursor cursor = db.rawQuery(selectQuery, where.args);
            try {
                if (!cursor.moveToNext()) {
                    return null;
                }
                JobHolder holder = createJobHolderFromCursor(cursor);
                setSessionIdOnJob(holder);
                return holder;
            } catch (InvalidJobException e) {
                //delete
                String jobId = cursor.getString(DbOpenHelper.ID_COLUMN.columnIndex);
                if (jobId == null) {
                    JqLog.e("cannot find job id on a retriewed job");
                } else {
                    delete(jobId);
                }
            } finally {
                cursor.close();
            }
        }
    }

    private Where createWhere(Constraint constraint) {
        return whereQueryCache.build(constraint, pendingCancelations, reusedStringBuilder);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Long getNextJobDelayUntilNs(@NonNull Constraint constraint) {
        final Where where = createWhere(constraint);
        try {
            long result = where.nextJobDelayUntil(db, sqlHelper).simpleQueryForLong();
            return result == Params.FOREVER ? null : result;
        } catch (SQLiteDoneException empty) {
            return null;
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void clear() {
        sqlHelper.truncate();
        cleanupFiles();
    }

    /**
     * This method is called when a job is pulled to run.
     * It is properly marked so that it won't be returned from next job queries.
     * <p/>
     * Same mechanism is also used for cancelled jobs.
     *
     * @param jobHolder The job holder to update session id
     */
    private void setSessionIdOnJob(JobHolder jobHolder) {
        SQLiteStatement stmt = sqlHelper.getOnJobFetchedForRunningStatement();
        jobHolder.setRunCount(jobHolder.getRunCount() + 1);
        jobHolder.setRunningSessionId(sessionId);
        stmt.clearBindings();
        stmt.bindLong(1, jobHolder.getRunCount());
        stmt.bindLong(2, sessionId);
        stmt.bindString(3, jobHolder.getId());
        stmt.execute();
    }

    @SuppressWarnings("unused")
    public String logJobs() {
        StringBuilder sb =  new StringBuilder();
        String select = sqlHelper.createSelect(
                null,
                100,
                new SqlHelper.Order(DbOpenHelper.PRIORITY_COLUMN,
                        SqlHelper.Order.Type.DESC),
                new SqlHelper.Order(DbOpenHelper.CREATED_NS_COLUMN,
                        SqlHelper.Order.Type.ASC),
                new SqlHelper.Order(DbOpenHelper.INSERTION_ORDER_COLUMN, SqlHelper.Order.Type.ASC)
        );
        Cursor cursor = db.rawQuery(select, new String[0]);
        try {
            while (cursor.moveToNext()) {
                String id = cursor.getString(DbOpenHelper.ID_COLUMN.columnIndex);
                sb.append(cursor.getLong(DbOpenHelper.INSERTION_ORDER_COLUMN.columnIndex))
                        .append(" ")
                        .append(id).append(" id:")
                        .append(cursor.getString(DbOpenHelper.GROUP_ID_COLUMN.columnIndex))
                        .append(" deadline:")
                        .append(cursor.getLong(DbOpenHelper.DEADLINE_COLUMN.columnIndex))
                        .append(" delay until:")
                        .append(cursor.getLong(DbOpenHelper.DELAY_UNTIL_NS_COLUMN.columnIndex))
                        .append(" sessionId:")
                        .append(cursor.getLong(DbOpenHelper.RUNNING_SESSION_ID_COLUMN.columnIndex))
                        .append(" reqNetworkType:")
                        .append(cursor.getLong(DbOpenHelper.REQUIRED_NETWORK_TYPE_OLUMN.columnIndex));
                Cursor tags = db.rawQuery("SELECT " + DbOpenHelper.TAGS_NAME_COLUMN.columnName
                        + " FROM " + DbOpenHelper.JOB_TAGS_TABLE_NAME + " WHERE "
                        + DbOpenHelper.TAGS_JOB_ID_COLUMN.columnName + " = ?", new String[]{id});
                try {
                    while (tags.moveToNext()) {
                        sb.append(", ").append(tags.getString(0));
                    }
                } finally {
                    tags.close();
                }
                sb.append("\n");

            }
        } finally {
            cursor.close();
        }
        return sb.toString();
    }

    private JobHolder createJobHolderFromCursor(Cursor cursor) throws InvalidJobException {
        String jobId = cursor.getString(DbOpenHelper.ID_COLUMN.columnIndex);
        Job job;
        try {
            job = safeDeserialize(jobStorage.load(jobId));
        } catch (IOException e) {
            throw new InvalidJobException("cannot load job from disk", e);
        }
        if (job == null) {
            throw new InvalidJobException("null job");
        }
        // load tags
        Set<String> tags = loadTags(jobId);
        //noinspection WrongConstant,UnnecessaryLocalVariable
        JobHolder holder = new JobHolder.Builder()
                .insertionOrder(cursor.getLong(DbOpenHelper.INSERTION_ORDER_COLUMN.columnIndex))
                .priority(cursor.getInt(DbOpenHelper.PRIORITY_COLUMN.columnIndex))
                .groupId(cursor.getString(DbOpenHelper.GROUP_ID_COLUMN.columnIndex))
                .runCount(cursor.getInt(DbOpenHelper.RUN_COUNT_COLUMN.columnIndex))
                .job(job)
                .id(jobId)
                .tags(tags)
                .persistent(true)
                .deadline(cursor.getLong(DbOpenHelper.DEADLINE_COLUMN.columnIndex),
                        cursor.getInt(DbOpenHelper.CANCEL_ON_DEADLINE_COLUMN.columnIndex) == 1)
                .createdNs(cursor.getLong(DbOpenHelper.CREATED_NS_COLUMN.columnIndex))
                .delayUntilNs(cursor.getLong(DbOpenHelper.DELAY_UNTIL_NS_COLUMN.columnIndex))
                .runningSessionId(cursor.getLong(DbOpenHelper.RUNNING_SESSION_ID_COLUMN.columnIndex))
                .requiredNetworkType(cursor.getInt(DbOpenHelper.REQUIRED_NETWORK_TYPE_OLUMN.columnIndex))
                .build();
        return holder;
    }

    private Set<String> loadTags(String jobId) {
        Cursor cursor = db.rawQuery(sqlHelper.LOAD_TAGS_QUERY, new String[]{jobId});
        if (cursor.getCount() == 0) {
            //noinspection unchecked
            return Collections.EMPTY_SET;
        }
        final Set<String> tags = new HashSet<>();
        try {
            while (cursor.moveToNext()) {
                tags.add(cursor.getString(0));
            }
        } finally {
            cursor.close();
        }
        return tags;
    }

    private Job safeDeserialize(byte[] bytes) {
        try {
            return jobSerializer.deserialize(bytes);
        } catch (Throwable t) {
            JqLog.e(t, "error while deserializing job");
        }
        return null;
    }

    @SuppressWarnings("WeakerAccess")
    static class InvalidJobException extends Exception {
        InvalidJobException(String detailMessage) {
            super(detailMessage);
        }

        InvalidJobException(String detailMessage, Throwable throwable) {
            super(detailMessage, throwable);
        }
    }

    public static class JavaSerializer implements JobSerializer {

        @Override
        public byte[] serialize(Object object) throws IOException {
            if (object == null) {
                return null;
            }
            ByteArrayOutputStream bos = null;
            try {
                bos = new ByteArrayOutputStream();
                ObjectOutput out = new ObjectOutputStream(bos);
                out.writeObject(object);
                // Get the bytes of the serialized object
                return bos.toByteArray();
            } finally {
                if (bos != null) {
                    bos.close();
                }
            }
        }

        @Override
        public <T extends Job> T deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
            if (bytes == null || bytes.length == 0) {
                return null;
            }
            ObjectInputStream in = null;
            try {
                in = new ObjectInputStream(new ByteArrayInputStream(bytes));
                //noinspection unchecked
                return (T) in.readObject();
            } finally {
                if (in != null) {
                    in.close();
                }
            }
        }
    }

    public interface JobSerializer {
        byte[] serialize(Object object) throws IOException;
        <T extends Job> T deserialize(byte[] bytes) throws IOException, ClassNotFoundException;
    }
}