This is part 4 of an N part series on rewriting my podgrabber application. Here are links to part one, part two, and part three. In part 3, I outlined my strategy for synchronizing between mediaStores. This post will update that strategy slightly to show how I’m now handling threading.

For the curious, the code lives in a Bazaar repository at http://bzr.podgrabber.org/trunk/

The SyncManager now takes a taskManager in its constructor.

class SyncManager(object):
    """This is a concrete implementation of a syncronization manager which is
    intended to be subclassed if necessary.

    A SyncManager connects two mediaStores with filters and processing steps.
    It should be able to copy files from the fromStore to the toStore, exclude
    any files which were filtered out, and execute any processingSteps along
    the way.
    """
    def __init__(self, fromStore, toStore, copyFilters, deleteFilters, preProcessingSteps, postProcessingSteps, taskManager):
        self.fromStore = fromStore
        self.toStore = toStore
        self.copyFilters = copyFilters
        self.deleteFilters = deleteFilters
        self.preProcessingSteps = preProcessingSteps
        self.postProcessingSteps = postProcessingSteps
        self.taskManager = taskManager
        self._init()

And on copying a file, the SyncManager pushes the request to the task manager:

    def syncCopy(self):
        for mediaFile in self.getCopyList():
            print "ADDING MEDIA FILE", mediaFile
            logger.info("Copying file %s" % mediaFile)
            self.taskManager.addCopyFile(mediaFile, self.toStore, self.preProcessingSteps, self.postProcessingSteps)

Here is the task manager code in its entirity:

from Queue import Queue
import thread
import threading
import time

import logging
logger = logging.getLogger("podgrabber.syncTaskManager")

class Shutdown(object):
    pass

class CopyWorker(threading.Thread):
    def __init__(self, q, fileDict):
        self.q = q
        self.fileDict = fileDict
        threading.Thread.__init__(self)
    def run(self):
        #print "Running copy thread", self.getName()
        logger.info("Running")
        while 1:
            logger.debug("Blocking while pulling items from queue")
            mediaFile, mediaStore, preProc, postProc = self.q.get()
            if type(mediaFile) == Shutdown:
                #print "Break"
                logger.info("Shutting down")
                break
            logger.debug("Retrieved items from queue")
            for preProcessingStep in preProc:
                mediaFile = preProcessingStep.process(mediaFile)
            logger.debug("Retrieving file %s" % mediaFile)
            mediaStore.addFile(mediaFile)
            logger.debug("Done etrieving file %s" % mediaFile)
            for postProcessingStep in postProc:
                mediaFile = postProcessingStep.process(mediaFile)
            self.fileDict[mediaFile][1] = time.time()

class TaskManager(object):
    def __init__(self, numCopyThreads=5, numDeleteThreads=2):
        self.copyQueue = Queue()
        self.deleteQueue = Queue()
        self.numCopyThreads = numCopyThreads
        self.numDeleteThreads = numDeleteThreads
        self.threadList = []
        self.fileDict = {}
        for i in range(numCopyThreads):
            #thread.start_new_thread(self._copyFile, ())
            copyWorker = CopyWorker(self.copyQueue, self.fileDict)
            copyWorker.setDaemon(True)
            copyWorker.start()
            self.threadList.append(copyWorker)
        for i in range(numDeleteThreads):
            pass
    def addCopyFile(self, mediaFile, mediaStore, preProc, postProc):
        self.fileDict[mediaFile] = [time.time(), None]
        self.copyQueue.put((mediaFile, mediaStore, preProc, postProc))
    def addDeleteFile(self, mediaFile, mediaStore, preProc, postProc):
        self.deleteQueue.put((mediaFile, mediaStore, preProc, postProc))
    def _copyFile(self):
        while 1:
            mediaFile, mediaStore, preProc, postProc = self.copyQueue.get()
            if type(mediaFile) == Shutdown:
                break
            for preProcessingStep in preProc:
                mediaFile = preProcessingStep.process(mediaFile)
            mediaStore.addFile(mediaFile)
            for postProcessingStep in postProc:
                mediaFile = postProcessingStep.process(mediaFile)
    def _deleteFile(self):
        pass
    def shutdown(self):
        for i in range(self.numCopyThreads):
            self.copyQueue.put((Shutdown(), None, None, None))
        for i in range(self.numDeleteThreads):
            self.deleteQueue.put((Shutdown(), None, None, None))
        for t in self.threadList:
            t.join()

Basically, the task manager creates a queue for copies and one for deletes and a number of threads for each operation. When the sync manager passes the request to copy files to the task manager, it is a non-blocking call. The downloading/processing of each file happens N files at a time, depending on how many threads you’ve allowed to be active. The defaults are 5 for copying and 2 for deleting.

So far, this seems to be working pretty well. The only thing that I see that could use some immediate improvement is to either thread the downloading of the RSS feed(s), or to use Doug’s feedcache, or both. I’m going to try to work on trying to get that supported over the weekend.

The next area of functionality addition is that of creating a GUI. From the feedback I received in a post today, I’m going to have to check out wxPython.