1 | ;;;; Copyright (C) 2011 Mark Evenson |
---|
2 | |
---|
3 | (in-package #:threads) |
---|
4 | |
---|
5 | (require 'abcl-contrib) |
---|
6 | (eval-when (:compile-toplevel) |
---|
7 | (require 'jss)) |
---|
8 | |
---|
9 | (defparameter *server-running-p* nil) |
---|
10 | |
---|
11 | ;;; XXX possibly need multiple thread pools |
---|
12 | (defparameter *thread-pool* nil |
---|
13 | "The current JVM class implementing the ScheduledThreadPool abstraction.") |
---|
14 | (defparameter *scheduled-futures* nil) |
---|
15 | (defparameter *incoming-scheduled-future* nil) |
---|
16 | (defparameter *watch-queue-future* nil) |
---|
17 | |
---|
18 | |
---|
19 | ;;;; Configure the directories for a threadpool from these defaults. |
---|
20 | (defparameter *root* #p"/var/tmp/abcl-threads/") |
---|
21 | |
---|
22 | (defparameter *logs* (merge-pathnames "logs/" *root*)) |
---|
23 | |
---|
24 | (defparameter *incoming* (merge-pathnames "incoming/" *root*)) |
---|
25 | (defparameter *dirs* (list *incoming*)) |
---|
26 | |
---|
27 | (defparameter *queue* (merge-pathnames "queue/" *root*)) |
---|
28 | |
---|
29 | (defparameter *processed* (merge-pathnames "processed/" *root*)) |
---|
30 | |
---|
31 | |
---|
32 | ;;;; A simple logging abstraction. |
---|
33 | |
---|
34 | (defconstant +month-names+ '("Jan" "Feb" "Mar" "Apr" "May" "Jun" |
---|
35 | "Jul" "Aug" "Sep" "Oct" "Nov" "Dec")) |
---|
36 | (defconstant +seconds+ (java:jfield "java.util.concurrent.TimeUnit" "SECONDS")) |
---|
37 | (defparameter *log* *standard-output*) |
---|
38 | |
---|
39 | (defun format-time (universal-time) |
---|
40 | (multiple-value-bind |
---|
41 | (second minute hour date month year day-of-week dst-p tz) |
---|
42 | (decode-universal-time universal-time) |
---|
43 | (format nil "~&~A ~A ~2,'0D:~2,'0D:~2,'0D" |
---|
44 | (nth (1- month) +month-names+) date hour minute second))) |
---|
45 | |
---|
46 | (defmacro log (message &rest parameters) |
---|
47 | `(when *log* |
---|
48 | (format *log* "~A " (format-time (get-universal-time))) |
---|
49 | (format *log* ,message ,@parameters) |
---|
50 | (format *log* "~&"))) |
---|
51 | |
---|
52 | ;;; Start a pool of hungry philosophers. |
---|
53 | (defun start-server () |
---|
54 | (when *server-running-p* |
---|
55 | (error "Server not recorded as stopped.")) |
---|
56 | (unless |
---|
57 | (mapcar #'ensure-directories-exist *dirs*) |
---|
58 | (error "Failed to create directories under '~A'." *root*)) |
---|
59 | (let ((logfile (merge-pathnames "abcl-threads.log" *logs*))) |
---|
60 | (setf *log* |
---|
61 | (open logfile :direction :output :if-exists :append :if-does-not-exist :create)) |
---|
62 | (format *standard-output* "Logging to ~A." logfile)) |
---|
63 | (log "Starts.") |
---|
64 | (schedule-threads) |
---|
65 | (setf *server-running-p* t)) |
---|
66 | |
---|
67 | (defun stop-server (&key (force nil)) |
---|
68 | (unless force |
---|
69 | (unless *server-running-p* |
---|
70 | (error "Server not recorded as running."))) |
---|
71 | (log "Stopping the server.") |
---|
72 | (dolist (future `(,*incoming-scheduled-future* ,*watch-queue-future* ,@*scheduled-futures*)) |
---|
73 | (when (not (or (#"isCancelled" future) |
---|
74 | (#"isDone" future))) |
---|
75 | (#"cancel" future t))) |
---|
76 | (#"shutdown" *thread-pool*) |
---|
77 | (close *log*) |
---|
78 | (setf *server-running-p* nil)) |
---|
79 | |
---|
80 | (defun schedule-threads () |
---|
81 | (log "Starting thread pool.") |
---|
82 | (when *thread-pool* |
---|
83 | (log "Removing existing incoming thread pool.")) |
---|
84 | (setf *thread-pool* |
---|
85 | (#"newScheduledThreadPool" 'java.util.concurrent.Executors 1)) |
---|
86 | (#"setExecuteExistingDelayedTasksAfterShutdownPolicy" *thread-pool* nil) |
---|
87 | (initialize-queue) |
---|
88 | (log "Scheduling queue watcher.") |
---|
89 | (setf *watch-queue-future* |
---|
90 | (#"scheduleWithFixedDelay" |
---|
91 | *thread-pool* |
---|
92 | (make-watch-queue) 10 1 +seconds+)) |
---|
93 | (log "Scheduling incoming watcher.") |
---|
94 | (setf *incoming-scheduled-future* |
---|
95 | (#"scheduleWithFixedDelay" |
---|
96 | *thread-pool* |
---|
97 | (make-process-incoming) 1 1 +seconds+))) |
---|
98 | |
---|
99 | (defun make-process-incoming () |
---|
100 | (java:jinterface-implementation "java.lang.Runnable" "run" #'process-incoming)) |
---|
101 | |
---|
102 | (defun process-incoming () |
---|
103 | (flet ((reject-input (file invalid) |
---|
104 | (warn (format nil "~A is ~A" file invalid))) |
---|
105 | (process (file) |
---|
106 | nil)) |
---|
107 | |
---|
108 | (let ((incoming (directory (merge-pathnames *incoming* "*")))) |
---|
109 | (unless incoming |
---|
110 | (return-from process-incoming)) |
---|
111 | (log "Processing ~A incoming items." (length incoming)) |
---|
112 | (let (table error) |
---|
113 | (dolist (file incoming) |
---|
114 | (setf error nil) |
---|
115 | (log "Analyzing ~A." file) |
---|
116 | (setf table |
---|
117 | (handler-case |
---|
118 | (process file) |
---|
119 | (t (e) |
---|
120 | (log "Failed to process ~A because ~A" file e) |
---|
121 | (setf error e)))) |
---|
122 | (if error |
---|
123 | (reject-input file (if (listp error) error (list error))) |
---|
124 | (multiple-value-bind (valid invalid) |
---|
125 | (validate table) |
---|
126 | (if invalid |
---|
127 | (progn |
---|
128 | (log "Rejecting ~A because of invalid rows." file) |
---|
129 | (reject-input file invalid)) |
---|
130 | (let ((incoming |
---|
131 | (make-pathname :defaults *queue* |
---|
132 | :name (pathname-name file) |
---|
133 | :type (pathname-type file)))) |
---|
134 | (log "Inserting ~A." incoming) |
---|
135 | (rename-file file incoming)))))))))) |
---|
136 | |
---|