JobManagerThread.java

package com.birbit.android.jobqueue;

import android.content.Context;
import android.support.annotation.Nullable;

import com.birbit.android.jobqueue.callback.JobManagerCallback;
import com.birbit.android.jobqueue.config.Configuration;
import com.birbit.android.jobqueue.di.DependencyInjector;
import com.birbit.android.jobqueue.log.JqLog;
import com.birbit.android.jobqueue.messaging.Message;
import com.birbit.android.jobqueue.messaging.MessageFactory;
import com.birbit.android.jobqueue.messaging.MessageQueueConsumer;
import com.birbit.android.jobqueue.messaging.PriorityMessageQueue;
import com.birbit.android.jobqueue.messaging.message.AddJobMessage;
import com.birbit.android.jobqueue.messaging.message.CancelMessage;
import com.birbit.android.jobqueue.messaging.message.CommandMessage;
import com.birbit.android.jobqueue.messaging.message.ConstraintChangeMessage;
import com.birbit.android.jobqueue.messaging.message.JobConsumerIdleMessage;
import com.birbit.android.jobqueue.messaging.message.PublicQueryMessage;
import com.birbit.android.jobqueue.messaging.message.RunJobResultMessage;
import com.birbit.android.jobqueue.messaging.message.SchedulerMessage;
import com.birbit.android.jobqueue.network.NetworkEventProvider;
import com.birbit.android.jobqueue.network.NetworkUtil;
import com.birbit.android.jobqueue.scheduling.Scheduler;
import com.birbit.android.jobqueue.scheduling.SchedulerConstraint;
import com.birbit.android.jobqueue.timer.Timer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

class JobManagerThread implements Runnable, NetworkEventProvider.Listener {
    public static final long NS_PER_MS = 1000000;
    public static final long NOT_RUNNING_SESSION_ID = Long.MIN_VALUE;
    public static final long NOT_DELAYED_JOB_DELAY = Long.MIN_VALUE;


    final Timer timer;
    private final Context appContext;
    @SuppressWarnings("FieldCanBeLocal")
    private final long sessionId;
    final JobQueue persistentJobQueue;
    final JobQueue nonPersistentJobQueue;
    private final NetworkUtil networkUtil;
    private final DependencyInjector dependencyInjector;
    private final MessageFactory messageFactory;
    final ConsumerManager consumerManager;
    @Nullable private List<CancelHandler> pendingCancelHandlers;
    @Nullable private List<SchedulerConstraint> pendingSchedulerCallbacks;
    final Constraint queryConstraint = new Constraint();

    final CallbackManager callbackManager;

    private boolean running = true;
    /**
     * We set this to true whenever we schedule a wake up and set to false whenever we call
     * cancelAll. It is not precise, does not cover scheduling across reboots but a fair comprimise
     * for simplicity.
     */
    private boolean shouldCancelAllScheduledWhenEmpty = false;

    final PriorityMessageQueue messageQueue;
    @Nullable
    Scheduler scheduler;

    JobManagerThread(Configuration config, PriorityMessageQueue messageQueue,
            MessageFactory messageFactory) {
        this.messageQueue = messageQueue;
        if(config.getCustomLogger() != null) {
            JqLog.setCustomLogger(config.getCustomLogger());
        }
        this.messageFactory = messageFactory;
        timer = config.getTimer();
        appContext = config.getAppContext();
        sessionId = timer.nanoTime();
        scheduler = config.getScheduler();
        if (scheduler != null && config.batchSchedulerRequests() &&
                !(scheduler instanceof BatchingScheduler)) {
            scheduler = new BatchingScheduler(scheduler, timer);
        }
        this.persistentJobQueue = config.getQueueFactory()
                .createPersistentQueue(config, sessionId);
        this.nonPersistentJobQueue = config.getQueueFactory()
                .createNonPersistent(config, sessionId);
        networkUtil = config.getNetworkUtil();
        dependencyInjector = config.getDependencyInjector();
        if(networkUtil instanceof NetworkEventProvider) {
            ((NetworkEventProvider) networkUtil).setListener(this);
        }
        consumerManager = new ConsumerManager(this, timer, messageFactory, config);
        callbackManager = new CallbackManager(messageFactory, timer);
    }

    void addCallback(JobManagerCallback callback) {
        callbackManager.addCallback(callback);
    }

    boolean removeCallback(JobManagerCallback callback) {
        return callbackManager.removeCallback(callback);
    }

    boolean canListenToNetwork() {
        return networkUtil instanceof NetworkEventProvider;
    }

    private void handleAddJob(AddJobMessage message) {
        Job job = message.getJob();
        //noinspection deprecation
        long now = timer.nanoTime();
        long delayUntilNs = job.getDelayInMs() > 0
                ? now + job.getDelayInMs() * NS_PER_MS
                : NOT_DELAYED_JOB_DELAY;
        long deadline = job.getDeadlineInMs() > 0
                ? now + job.getDeadlineInMs() * NS_PER_MS
                : Params.FOREVER;
        JobHolder jobHolder = new JobHolder.Builder()
                .priority(job.getPriority())
                .job(job)
                .groupId(job.getRunGroupId())
                .createdNs(now)
                .delayUntilNs(delayUntilNs)
                .id(job.getId())
                .tags(job.getTags())
                .persistent(job.isPersistent())
                .runCount(0)
                .deadline(deadline, job.shouldCancelOnDeadline())
                .requiredNetworkType(job.requiredNetworkType)
                .runningSessionId(NOT_RUNNING_SESSION_ID).build();

        JobHolder oldJob = findJobBySingleId(job.getSingleInstanceId());
        final boolean insert = oldJob == null || consumerManager.isJobRunning(oldJob.getId());
        if (insert) {
            JobQueue queue = job.isPersistent() ? persistentJobQueue : nonPersistentJobQueue;
            if (oldJob != null) { //the other job was running, will be cancelled if it fails
                consumerManager.markJobsCancelledSingleId(TagConstraint.ANY, new String[]{job.getSingleInstanceId()});
                queue.substitute(jobHolder, oldJob);
            } else {
                queue.insert(jobHolder);
            }
            if (JqLog.isDebugEnabled()) {
                JqLog.d("added job class: %s priority: %d delay: %d group : %s persistent: %s"
                        , job.getClass().getSimpleName(), job.getPriority(), job.getDelayInMs()
                        , job.getRunGroupId(), job.isPersistent());
            }
        } else {
            JqLog.d("another job with same singleId: %s was already queued", job.getSingleInstanceId());
        }
        if(dependencyInjector != null) {
            //inject members b4 calling onAdded
            dependencyInjector.inject(job);
        }
        jobHolder.setApplicationContext(appContext);
        jobHolder.getJob().onAdded();
        callbackManager.notifyOnAdded(jobHolder.getJob());
        if (insert) {
            consumerManager.onJobAdded();
            if (job.isPersistent()) {
                scheduleWakeUpFor(jobHolder, now);
            }
        } else {
            cancelSafely(jobHolder, CancelReason.SINGLE_INSTANCE_ID_QUEUED);
            callbackManager.notifyOnDone(jobHolder.getJob());
        }
    }

    private void scheduleWakeUpFor(JobHolder holder, long now) {
        if (scheduler == null) {
            return;
        }
        int requiredNetwork = holder.requiredNetworkType;
        long delayUntilNs = holder.getDelayUntilNs();
        long deadlineNs = holder.getDeadlineNs();
        long delay = delayUntilNs > now ? TimeUnit.NANOSECONDS.toMillis(delayUntilNs - now) : 0;
        Long deadline = deadlineNs != Params.FOREVER
                ? TimeUnit.NANOSECONDS.toMillis(deadlineNs - now)
                : null;
        boolean hasLargeDelay = delayUntilNs > now && delay >= JobManager.MIN_DELAY_TO_USE_SCHEDULER_IN_MS;
        boolean hasLargeDeadline = deadline != null && deadline >= JobManager.MIN_DELAY_TO_USE_SCHEDULER_IN_MS;
        if (requiredNetwork == NetworkUtil.DISCONNECTED && !hasLargeDelay && !hasLargeDeadline) {
            return;
        }

        SchedulerConstraint constraint = new SchedulerConstraint(UUID.randomUUID().toString());
        constraint.setNetworkStatus(requiredNetwork);
        constraint.setDelayInMs(delay);
        constraint.setOverrideDeadlineInMs(deadline);
        scheduler.request(constraint);
        shouldCancelAllScheduledWhenEmpty = true;
    }

    /**
     * Returns a queued job with the same single id. If any matching non-running job is found,
     * that one is returned. Otherwise any matching running job will be returned.
     */
    private JobHolder findJobBySingleId(/*Nullable*/String singleIdTag) {
        if (singleIdTag != null) {
            queryConstraint.clear();
            queryConstraint.setTags(new String[]{singleIdTag});
            queryConstraint.setTagConstraint(TagConstraint.ANY);
            queryConstraint.setMaxNetworkType(NetworkUtil.UNMETERED);
            Set<JobHolder> jobs = nonPersistentJobQueue.findJobs(queryConstraint);
            jobs.addAll(persistentJobQueue.findJobs(queryConstraint));
            if (!jobs.isEmpty()) {
                for (JobHolder job : jobs) {
                    if (!consumerManager.isJobRunning(job.getId())) {
                        return job;
                    }
                }
                return jobs.iterator().next();
            }
        }
        return null;
    }

    @Override
    public void run() {
        messageQueue.consume(new MessageQueueConsumer() {
            @Override
            public void handleMessage(Message message) {
                switch (message.type) {
                    case ADD_JOB:
                        handleAddJob((AddJobMessage) message);
                        break;
                    case JOB_CONSUMER_IDLE:
                        boolean busy = consumerManager.handleIdle((JobConsumerIdleMessage) message);
                        if (!busy) {
                            invokeSchedulersIfIdle();
                        }
                        break;
                    case RUN_JOB_RESULT:
                        handleRunJobResult((RunJobResultMessage) message);
                        break;
                    case CONSTRAINT_CHANGE:
                        consumerManager.handleConstraintChange();
                        break;
                    case CANCEL:
                        handleCancel((CancelMessage) message);
                        break;
                    case PUBLIC_QUERY:
                        handlePublicQuery((PublicQueryMessage) message);
                        break;
                    case COMMAND:
                        handleCommand((CommandMessage) message);
                        break;
                    case SCHEDULER:
                        handleSchedulerMessage((SchedulerMessage) message);
                        break;
                }
            }

            @Override
            public void onIdle() {
                JqLog.d("joq idle. running:? %s", running);
                if (!running) {
                    return;
                }
                Long nextJobTimeNs = getNextWakeUpNs(true);
                // TODO check network should be another message which goes idle if network is the
                // same as now
                JqLog.d("Job queue idle. next job at: %s", nextJobTimeNs);
                if (nextJobTimeNs != null) {
                    ConstraintChangeMessage constraintMessage =
                            messageFactory.obtain(ConstraintChangeMessage.class);
                    messageQueue.postAt(constraintMessage, nextJobTimeNs);
                } else if (scheduler != null) {
                    // if we have a scheduler but the queue is empty, just clean them all.
                    if (shouldCancelAllScheduledWhenEmpty && persistentJobQueue.count() == 0) {
                        shouldCancelAllScheduledWhenEmpty = false;
                        scheduler.cancelAll();
                    }
                }
            }
        });
    }

    private void invokeSchedulersIfIdle() {
        if (scheduler == null || pendingSchedulerCallbacks == null
                || pendingSchedulerCallbacks.isEmpty() || !consumerManager.areAllConsumersIdle()) {
            return;
        }
        for (int i = pendingSchedulerCallbacks.size() - 1; i >= 0; i--) {
            SchedulerConstraint constraint = pendingSchedulerCallbacks.remove(i);
            boolean reschedule = hasJobsWithSchedulerConstraint(constraint);
            scheduler.onFinished(constraint, reschedule);
        }
    }

    private void handleSchedulerMessage(SchedulerMessage message) {
        final int what = message.getWhat();
        if (what == SchedulerMessage.START) {
            handleSchedulerStart(message.getConstraint());
        } else if (what == SchedulerMessage.STOP) {
            handleSchedulerStop(message.getConstraint());
        } else {
            throw new IllegalArgumentException("Unknown scheduler message with what " + what);
        }
    }

    private boolean hasJobsWithSchedulerConstraint(SchedulerConstraint constraint) {
        if (consumerManager.hasJobsWithSchedulerConstraint(constraint)) {
            return true;
        }

        queryConstraint.clear();
        queryConstraint.setNowInNs(timer.nanoTime());
        queryConstraint.setMaxNetworkType(constraint.getNetworkStatus());
        return persistentJobQueue.countReadyJobs(queryConstraint) > 0;
    }

    private void handleSchedulerStop(SchedulerConstraint constraint) {
        final List<SchedulerConstraint> pendingCallbacks = this.pendingSchedulerCallbacks;
        if (pendingCallbacks != null) {
            for (int i = pendingCallbacks.size() - 1; i >= 0; i--) {
                SchedulerConstraint pendingConstraint = pendingCallbacks.get(i);
                if (pendingConstraint.getUuid().equals(constraint.getUuid())) {
                    pendingCallbacks.remove(i);
                }
            }
        }
        if (scheduler == null) {
            return;//nothing to do
        }
        final boolean hasMatchingJobs = hasJobsWithSchedulerConstraint(constraint);
        if (hasMatchingJobs) {
            // reschedule
            scheduler.request(constraint);
        }
    }


    private void handleSchedulerStart(SchedulerConstraint constraint) {
        if (!isRunning()) {
            if (scheduler != null) {
                scheduler.onFinished(constraint, true);
            }
            return;
        }
        boolean hasMatchingJobs = hasJobsWithSchedulerConstraint(constraint);
        if (!hasMatchingJobs) {
            if (scheduler != null) {
                scheduler.onFinished(constraint, false);
            }
            return;
        }
        if (pendingSchedulerCallbacks == null) {
            pendingSchedulerCallbacks = new ArrayList<>();
        }
        // add this to callbacks to be invoked when job runs
        pendingSchedulerCallbacks.add(constraint);
        consumerManager.handleConstraintChange();
    }

    private void handleCommand(CommandMessage message) {
        if (message.getWhat() == CommandMessage.QUIT) {
            messageQueue.stop();
            messageQueue.clear();
        }
    }

    int count() {
        return persistentJobQueue.count() + nonPersistentJobQueue.count();
    }

    private void handlePublicQuery(PublicQueryMessage message) {
        switch (message.getWhat()) {
            case PublicQueryMessage.COUNT:
                message.getCallback().onResult(count());
                break;
            case PublicQueryMessage.COUNT_READY:
                message.getCallback().onResult(countReadyJobs(getNetworkStatus()));
                break;
            case PublicQueryMessage.START:
                JqLog.d("handling start request...");
                if (running) {
                    return;
                }
                running = true;
                consumerManager.handleConstraintChange();
                break;
            case PublicQueryMessage.STOP:
                JqLog.d("handling stop request...");
                running = false;
                consumerManager.handleStop();
                break;
            case PublicQueryMessage.JOB_STATUS:
                JobStatus status = getJobStatus(message.getStringArg());
                message.getCallback().onResult(status.ordinal());
                break;
            case PublicQueryMessage.CLEAR:
                clear();
                if (message.getCallback() != null) {
                    message.getCallback().onResult(0);
                }
                break;
            case PublicQueryMessage.ACTIVE_CONSUMER_COUNT:
                message.getCallback().onResult(consumerManager.getWorkerCount());
                break;
            case PublicQueryMessage.INTERNAL_RUNNABLE:
                message.getCallback().onResult(0);
                break;
            default:
                throw new IllegalArgumentException("cannot handle public query with type " +
                message.getWhat());
        }
    }

    private void clear() {
        nonPersistentJobQueue.clear();
        persistentJobQueue.clear();
    }

    private JobStatus getJobStatus(String id) {
        if (consumerManager.isJobRunning(id)) {
            return JobStatus.RUNNING;
        }
        JobHolder holder;
        holder = nonPersistentJobQueue.findJobById(id);
        if (holder == null) {
            holder = persistentJobQueue.findJobById(id);
        }
        if(holder == null) {
            return JobStatus.UNKNOWN;
        }
        final int networkStatus = getNetworkStatus();
        final long now = timer.nanoTime();
        if(networkStatus < holder.requiredNetworkType) {
            return JobStatus.WAITING_NOT_READY;
        }
        if(holder.getDelayUntilNs() > now) {
            return JobStatus.WAITING_NOT_READY;
        }
        return JobStatus.WAITING_READY;
    }

    private void handleCancel(CancelMessage message) {
        CancelHandler handler = new CancelHandler(message.getConstraint(), message.getTags(),
                message.getCallback());
        handler.query(this, consumerManager);
        if (handler.isDone()) {
            handler.commit(this);
        } else {
            if (pendingCancelHandlers == null) {
                pendingCancelHandlers = new ArrayList<>();
            }
            pendingCancelHandlers.add(handler);
        }
    }

    private void handleRunJobResult(RunJobResultMessage message) {
        final int result = message.getResult();
        final JobHolder jobHolder = message.getJobHolder();
        callbackManager.notifyOnRun(jobHolder.getJob(), result);
        RetryConstraint retryConstraint = null;
        switch (result) {
            case JobHolder.RUN_RESULT_SUCCESS:
                removeJob(jobHolder);
                break;
            case JobHolder.RUN_RESULT_FAIL_RUN_LIMIT:
                cancelSafely(jobHolder, CancelReason.REACHED_RETRY_LIMIT);
                removeJob(jobHolder);
                break;
            case JobHolder.RUN_RESULT_FAIL_SHOULD_RE_RUN:
                cancelSafely(jobHolder, CancelReason.CANCELLED_VIA_SHOULD_RE_RUN);
                removeJob(jobHolder);
                break;
            case JobHolder.RUN_RESULT_FAIL_SINGLE_ID:
                cancelSafely(jobHolder, CancelReason.SINGLE_INSTANCE_WHILE_RUNNING);
                removeJob(jobHolder);
                break;
            case JobHolder.RUN_RESULT_HIT_DEADLINE:
                cancelSafely(jobHolder, CancelReason.REACHED_DEADLINE);
                removeJob(jobHolder);
                break;
            case JobHolder.RUN_RESULT_TRY_AGAIN:
                retryConstraint = jobHolder.getRetryConstraint();
                insertOrReplace(jobHolder);
                break;
            case JobHolder.RUN_RESULT_FAIL_FOR_CANCEL:
                JqLog.d("running job failed and cancelled, doing nothing. "
                        + "Will be removed after it's onCancel is called by the "
                        + "CancelHandler");
                break;
            default:
                throw new IllegalArgumentException("unknown job holder result");
        }
        consumerManager.handleRunJobResult(message, jobHolder, retryConstraint);
        callbackManager.notifyAfterRun(jobHolder.getJob(), result);
        if (pendingCancelHandlers != null) {
            int limit = pendingCancelHandlers.size();
            for (int i = 0; i < limit; i ++) {
                CancelHandler handler = pendingCancelHandlers.get(i);
                handler.onJobRun(jobHolder, result);
                if (handler.isDone()) {
                    handler.commit(this);
                    pendingCancelHandlers.remove(i);
                    i--;
                    limit--;
                }
            }
        }
    }

    private void cancelSafely(JobHolder jobHolder, @CancelReason int cancelReason) {
        try {
            jobHolder.onCancel(cancelReason);
        } catch (Throwable t) {
            JqLog.e(t, "job's onCancel did throw an exception, ignoring...");
        }
        callbackManager.notifyOnCancel(jobHolder.getJob(), false, jobHolder.getThrowable());
    }

    private void insertOrReplace(JobHolder jobHolder) {
        RetryConstraint retryConstraint = jobHolder.getRetryConstraint();
        if (retryConstraint == null) {
            reAddJob(jobHolder);
            return;
        }
        if (retryConstraint.getNewPriority() != null) {
            jobHolder.setPriority(retryConstraint.getNewPriority());
        }
        long delay = -1;
        if (retryConstraint.getNewDelayInMs() != null) {
            delay = retryConstraint.getNewDelayInMs();
        }
        jobHolder.setDelayUntilNs(
                delay > 0 ? timer.nanoTime() + delay * NS_PER_MS : NOT_DELAYED_JOB_DELAY
        );
        reAddJob(jobHolder);
    }

    private void reAddJob(JobHolder jobHolder) {
        if (!jobHolder.isCancelled()) {
            if (jobHolder.getJob().isPersistent()) {
                persistentJobQueue.insertOrReplace(jobHolder);
            } else {
                nonPersistentJobQueue.insertOrReplace(jobHolder);
            }
        } else {
            JqLog.d("not re-adding cancelled job " + jobHolder);
        }
    }

    private void removeJob(JobHolder jobHolder) {
        if (jobHolder.getJob().isPersistent()) {
            persistentJobQueue.remove(jobHolder);
        } else {
            nonPersistentJobQueue.remove(jobHolder);
        }
        callbackManager.notifyOnDone(jobHolder.getJob());
    }

    @Override
    public void onNetworkChange(@NetworkUtil.NetworkStatus int networkStatus) {
        ConstraintChangeMessage constraint = messageFactory.obtain(ConstraintChangeMessage.class);
        messageQueue.post(constraint);
    }

    boolean isRunning() {
        return running;
    }

    int countRemainingReadyJobs() {
        return countReadyJobs(getNetworkStatus());
    }

    private int countReadyJobs(@NetworkUtil.NetworkStatus int networkStatus) {
        final Collection<String> runningJobs = consumerManager.runningJobGroups.getSafe();
        queryConstraint.clear();
        queryConstraint.setNowInNs(timer.nanoTime());
        queryConstraint.setMaxNetworkType(networkStatus);
        queryConstraint.setExcludeGroups(runningJobs);
        queryConstraint.setExcludeRunning(true);
        queryConstraint.setTimeLimit(timer.nanoTime());
        //TODO we can cache this
        int total = 0;
        total += nonPersistentJobQueue.countReadyJobs(queryConstraint);
        total += persistentJobQueue.countReadyJobs(queryConstraint);
        return total;
    }

    @NetworkUtil.NetworkStatus
    private int getNetworkStatus() {
        return networkUtil == null ? NetworkUtil.UNMETERED : networkUtil.getNetworkStatus(appContext);
    }

    Long getNextWakeUpNs(boolean includeNetworkWatch) {
        final Long groupDelay = consumerManager.runningJobGroups.getNextDelayForGroups();
        final int networkStatus = getNetworkStatus();
        final Collection<String> groups = consumerManager.runningJobGroups.getSafe();
        queryConstraint.clear();
        queryConstraint.setNowInNs(timer.nanoTime());
        queryConstraint.setMaxNetworkType(networkStatus);
        queryConstraint.setExcludeGroups(groups);
        queryConstraint.setExcludeRunning(true);
        final Long nonPersistent = nonPersistentJobQueue.getNextJobDelayUntilNs(queryConstraint);
        final Long persistent = persistentJobQueue.getNextJobDelayUntilNs(queryConstraint);
        Long delay = null;
        if (groupDelay != null) {
            delay = groupDelay;
        }
        if (nonPersistent != null) {
            delay = delay == null ? nonPersistent : Math.min(nonPersistent, delay);
        }
        if (persistent != null) {
            delay = delay == null ? persistent : Math.min(persistent, delay);
        }
        if (includeNetworkWatch && !(networkUtil instanceof NetworkEventProvider)) {
            // if network cannot provide events, we need to wake up :/
            long checkNetworkAt = timer.nanoTime() + JobManager.NETWORK_CHECK_INTERVAL;
            delay = delay == null ? checkNetworkAt : Math.min(checkNetworkAt, delay);
        }
        return delay;
    }

    // Used for testing
    JobHolder getNextJobForTesting() {
        return getNextJobForTesting(null);
    }

    // Used for testing
    JobHolder getNextJobForTesting(Collection<String> runningJobGroups) {
        return getNextJob(runningJobGroups, true);
    }

    JobHolder getNextJob(Collection<String> runningJobGroups) {
        return getNextJob(runningJobGroups, false);
    }

    JobHolder getNextJob(Collection<String> runningJobGroups, boolean ignoreRunning) {
        if (!running && !ignoreRunning) {
            return null;
        }
        JobHolder jobHolder = null;
        while (jobHolder == null) {
            final int networkStatus = getNetworkStatus();
            boolean persistent = false;
            JqLog.v("looking for next job");
            queryConstraint.clear();
            long now = timer.nanoTime();
            queryConstraint.setNowInNs(now);
            queryConstraint.setMaxNetworkType(networkStatus);
            queryConstraint.setExcludeGroups(runningJobGroups);
            queryConstraint.setExcludeRunning(true);
            queryConstraint.setTimeLimit(now);
            jobHolder = nonPersistentJobQueue.nextJobAndIncRunCount(queryConstraint);
            JqLog.v("non persistent result %s", jobHolder);
            if (jobHolder == null) {
                //go to disk, there aren't any non-persistent jobs
                jobHolder = persistentJobQueue.nextJobAndIncRunCount(queryConstraint);
                persistent = true;
                JqLog.v("persistent result %s", jobHolder);
            }
            if (jobHolder == null) {
                return null;
            }
            if (persistent && dependencyInjector != null) {
                dependencyInjector.inject(jobHolder.getJob());
            }
            jobHolder.setApplicationContext(appContext);
            jobHolder.setDeadlineIsReached(jobHolder.getDeadlineNs() <= now);
            if (jobHolder.getDeadlineNs() <= now
                    && jobHolder.shouldCancelOnDeadline()) {
                cancelSafely(jobHolder, CancelReason.REACHED_DEADLINE);
                removeJob(jobHolder);
                jobHolder = null;
            }
        }
        return jobHolder;
    }
}