| 1 | ;;;; Copyright (C) 2011 Mark Evenson |
|---|
| 2 | |
|---|
| 3 | (in-package #:threads) |
|---|
| 4 | |
|---|
| 5 | (require 'abcl-contrib) |
|---|
| 6 | (eval-when (:compile-toplevel) |
|---|
| 7 | (require 'jss)) |
|---|
| 8 | |
|---|
| 9 | (defparameter *server-running-p* nil) |
|---|
| 10 | |
|---|
| 11 | ;;; XXX possibly need multiple thread pools |
|---|
| 12 | (defparameter *thread-pool* nil |
|---|
| 13 | "The current JVM class implementing the ScheduledThreadPool abstraction.") |
|---|
| 14 | (defparameter *scheduled-futures* nil) |
|---|
| 15 | (defparameter *incoming-scheduled-future* nil) |
|---|
| 16 | (defparameter *watch-queue-future* nil) |
|---|
| 17 | |
|---|
| 18 | |
|---|
| 19 | ;;;; Configure the directories for a threadpool from these defaults. |
|---|
| 20 | (defparameter *root* #p"/var/tmp/abcl-threads/") |
|---|
| 21 | |
|---|
| 22 | (defparameter *logs* (merge-pathnames "logs/" *root*)) |
|---|
| 23 | |
|---|
| 24 | (defparameter *incoming* (merge-pathnames "incoming/" *root*)) |
|---|
| 25 | (defparameter *dirs* (list *incoming*)) |
|---|
| 26 | |
|---|
| 27 | (defparameter *queue* (merge-pathnames "queue/" *root*)) |
|---|
| 28 | |
|---|
| 29 | (defparameter *processed* (merge-pathnames "processed/" *root*)) |
|---|
| 30 | |
|---|
| 31 | |
|---|
| 32 | ;;;; A simple logging abstraction. |
|---|
| 33 | |
|---|
| 34 | (defconstant +month-names+ '("Jan" "Feb" "Mar" "Apr" "May" "Jun" |
|---|
| 35 | "Jul" "Aug" "Sep" "Oct" "Nov" "Dec")) |
|---|
| 36 | (defconstant +seconds+ (java:jfield "java.util.concurrent.TimeUnit" "SECONDS")) |
|---|
| 37 | (defparameter *log* *standard-output*) |
|---|
| 38 | |
|---|
| 39 | (defun format-time (universal-time) |
|---|
| 40 | (multiple-value-bind |
|---|
| 41 | (second minute hour date month year day-of-week dst-p tz) |
|---|
| 42 | (decode-universal-time universal-time) |
|---|
| 43 | (format nil "~&~A ~A ~2,'0D:~2,'0D:~2,'0D" |
|---|
| 44 | (nth (1- month) +month-names+) date hour minute second))) |
|---|
| 45 | |
|---|
| 46 | (defmacro log (message &rest parameters) |
|---|
| 47 | `(when *log* |
|---|
| 48 | (format *log* "~A " (format-time (get-universal-time))) |
|---|
| 49 | (format *log* ,message ,@parameters) |
|---|
| 50 | (format *log* "~&"))) |
|---|
| 51 | |
|---|
| 52 | ;;; Start a pool of hungry philosophers. |
|---|
| 53 | (defun start-server () |
|---|
| 54 | (when *server-running-p* |
|---|
| 55 | (error "Server not recorded as stopped.")) |
|---|
| 56 | (unless |
|---|
| 57 | (mapcar #'ensure-directories-exist *dirs*) |
|---|
| 58 | (error "Failed to create directories under '~A'." *root*)) |
|---|
| 59 | (let ((logfile (merge-pathnames "abcl-threads.log" *logs*))) |
|---|
| 60 | (setf *log* |
|---|
| 61 | (open logfile :direction :output :if-exists :append :if-does-not-exist :create)) |
|---|
| 62 | (format *standard-output* "Logging to ~A." logfile)) |
|---|
| 63 | (log "Starts.") |
|---|
| 64 | (schedule-threads) |
|---|
| 65 | (setf *server-running-p* t)) |
|---|
| 66 | |
|---|
| 67 | (defun stop-server (&key (force nil)) |
|---|
| 68 | (unless force |
|---|
| 69 | (unless *server-running-p* |
|---|
| 70 | (error "Server not recorded as running."))) |
|---|
| 71 | (log "Stopping the server.") |
|---|
| 72 | (dolist (future `(,*incoming-scheduled-future* ,*watch-queue-future* ,@*scheduled-futures*)) |
|---|
| 73 | (when (not (or (#"isCancelled" future) |
|---|
| 74 | (#"isDone" future))) |
|---|
| 75 | (#"cancel" future t))) |
|---|
| 76 | (#"shutdown" *thread-pool*) |
|---|
| 77 | (close *log*) |
|---|
| 78 | (setf *server-running-p* nil)) |
|---|
| 79 | |
|---|
| 80 | (defun schedule-threads () |
|---|
| 81 | (log "Starting thread pool.") |
|---|
| 82 | (when *thread-pool* |
|---|
| 83 | (log "Removing existing incoming thread pool.")) |
|---|
| 84 | (setf *thread-pool* |
|---|
| 85 | (#"newScheduledThreadPool" 'java.util.concurrent.Executors 1)) |
|---|
| 86 | (#"setExecuteExistingDelayedTasksAfterShutdownPolicy" *thread-pool* nil) |
|---|
| 87 | (initialize-queue) |
|---|
| 88 | (log "Scheduling queue watcher.") |
|---|
| 89 | (setf *watch-queue-future* |
|---|
| 90 | (#"scheduleWithFixedDelay" |
|---|
| 91 | *thread-pool* |
|---|
| 92 | (make-watch-queue) 10 1 +seconds+)) |
|---|
| 93 | (log "Scheduling incoming watcher.") |
|---|
| 94 | (setf *incoming-scheduled-future* |
|---|
| 95 | (#"scheduleWithFixedDelay" |
|---|
| 96 | *thread-pool* |
|---|
| 97 | (make-process-incoming) 1 1 +seconds+))) |
|---|
| 98 | |
|---|
| 99 | (defun make-process-incoming () |
|---|
| 100 | (java:jinterface-implementation "java.lang.Runnable" "run" #'process-incoming)) |
|---|
| 101 | |
|---|
| 102 | (defun process-incoming () |
|---|
| 103 | (flet ((reject-input (file invalid) |
|---|
| 104 | (warn (format nil "~A is ~A" file invalid))) |
|---|
| 105 | (process (file) |
|---|
| 106 | nil)) |
|---|
| 107 | |
|---|
| 108 | (let ((incoming (directory (merge-pathnames *incoming* "*")))) |
|---|
| 109 | (unless incoming |
|---|
| 110 | (return-from process-incoming)) |
|---|
| 111 | (log "Processing ~A incoming items." (length incoming)) |
|---|
| 112 | (let (table error) |
|---|
| 113 | (dolist (file incoming) |
|---|
| 114 | (setf error nil) |
|---|
| 115 | (log "Analyzing ~A." file) |
|---|
| 116 | (setf table |
|---|
| 117 | (handler-case |
|---|
| 118 | (process file) |
|---|
| 119 | (t (e) |
|---|
| 120 | (log "Failed to process ~A because ~A" file e) |
|---|
| 121 | (setf error e)))) |
|---|
| 122 | (if error |
|---|
| 123 | (reject-input file (if (listp error) error (list error))) |
|---|
| 124 | (multiple-value-bind (valid invalid) |
|---|
| 125 | (validate table) |
|---|
| 126 | (if invalid |
|---|
| 127 | (progn |
|---|
| 128 | (log "Rejecting ~A because of invalid rows." file) |
|---|
| 129 | (reject-input file invalid)) |
|---|
| 130 | (let ((incoming |
|---|
| 131 | (make-pathname :defaults *queue* |
|---|
| 132 | :name (pathname-name file) |
|---|
| 133 | :type (pathname-type file)))) |
|---|
| 134 | (log "Inserting ~A." incoming) |
|---|
| 135 | (rename-file file incoming)))))))))) |
|---|
| 136 | |
|---|