source: trunk/abcl/src/org/armedbear/lisp/threads-jss.lisp @ 13621

Last change on this file since 13621 was 13621, checked in by Mark Evenson, 12 years ago

Continue to refine runnable names.

File size: 4.7 KB
Line 
1;;;; Copyright (C) 2011 Mark Evenson
2
3(in-package #:threads)
4
5(require 'abcl-contrib)
6(eval-when (:compile-toplevel)
7           (require 'jss))
8
9(defparameter *server-running-p* nil)
10
11;;; XXX possibly need multiple thread pools
12(defparameter *thread-pool* nil
13  "The current JVM class implementing the ScheduledThreadPool abstraction.")
14(defparameter *scheduled-futures* nil)
15(defparameter *incoming-scheduled-future* nil)
16(defparameter *watch-queue-future* nil)
17
18
19;;;; Configure the directories for a threadpool from these defaults.
20(defparameter *root* #p"/var/tmp/abcl-threads/")
21
22(defparameter *logs* (merge-pathnames "logs/" *root*))
23
24(defparameter *incoming* (merge-pathnames "incoming/" *root*))
25(defparameter *dirs* (list *incoming*))
26
27(defparameter *queue* (merge-pathnames "queue/" *root*))
28
29(defparameter *processed* (merge-pathnames "processed/" *root*))
30
31
32;;;; A simple logging abstraction.
33
34(defconstant +month-names+ '("Jan" "Feb" "Mar" "Apr" "May" "Jun" 
35                             "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
36(defconstant +seconds+ (java:jfield "java.util.concurrent.TimeUnit" "SECONDS"))
37(defparameter *log* *standard-output*)
38
39(defun format-time (universal-time)
40    (multiple-value-bind 
41          (second minute hour date month year day-of-week dst-p tz)
42        (decode-universal-time universal-time)
43      (format nil "~&~A ~A ~2,'0D:~2,'0D:~2,'0D" 
44              (nth (1- month) +month-names+) date hour minute second)))
45
46(defmacro log (message &rest parameters)
47  `(when *log*
48     (format *log* "~A " (format-time (get-universal-time)))
49     (format *log* ,message ,@parameters)
50     (format *log* "~&")))
51
52;;; Start a pool of hungry philosophers.
53(defun start-server () 
54  (when *server-running-p*
55    (error "Server not recorded as stopped."))
56  (unless 
57      (mapcar #'ensure-directories-exist *dirs*)
58    (error "Failed to create directories under '~A'." *root*))
59  (let ((logfile (merge-pathnames "abcl-threads.log" *logs*)))
60    (setf *log* 
61          (open logfile :direction :output :if-exists :append :if-does-not-exist :create))
62    (format *standard-output* "Logging to ~A." logfile))
63  (log "Starts.")
64  (schedule-threads)
65  (setf *server-running-p* t))
66
67(defun stop-server (&key (force nil))
68  (unless force
69    (unless *server-running-p*
70      (error "Server not recorded as running.")))
71  (log "Stopping the server.")
72  (dolist (future `(,*incoming-scheduled-future* ,*watch-queue-future* ,@*scheduled-futures*))
73    (when (not (or (#"isCancelled" future)
74                   (#"isDone" future)))
75      (#"cancel" future t)))
76  (#"shutdown" *thread-pool*)
77  (close *log*)
78  (setf *server-running-p* nil))
79
80(defun schedule-threads ()
81  (log "Starting thread pool.")
82  (when *thread-pool*
83    (log "Removing existing incoming thread pool."))
84  (setf *thread-pool*
85        (#"newScheduledThreadPool" 'java.util.concurrent.Executors 1))
86  (#"setExecuteExistingDelayedTasksAfterShutdownPolicy" *thread-pool* nil)
87  (initialize-queue)
88  (log "Scheduling queue watcher.")
89  (setf *watch-queue-future* 
90        (#"scheduleWithFixedDelay" 
91         *thread-pool*
92         (make-watch-queue) 10 1 +seconds+))
93  (log "Scheduling incoming watcher.")
94  (setf *incoming-scheduled-future*
95        (#"scheduleWithFixedDelay" 
96         *thread-pool*
97         (make-process-incoming) 1 1 +seconds+)))
98
99(defun make-process-incoming ()
100  (java:jinterface-implementation "java.lang.Runnable" "run" #'process-incoming))
101
102(defun process-incoming ()
103  (flet ((reject-input (file invalid) 
104           (warn (format nil "~A is ~A" file invalid)))
105         (process (file)
106           nil))
107
108  (let ((incoming (directory (merge-pathnames *incoming* "*"))))
109    (unless incoming
110      (return-from process-incoming))
111    (log "Processing ~A incoming items." (length incoming))
112    (let (table error)
113      (dolist (file incoming)
114        (setf error nil)
115        (log "Analyzing ~A." file)
116        (setf table
117              (handler-case 
118                  (process file)
119                (t (e) 
120                  (log "Failed to process ~A because ~A" file e)
121                  (setf error e))))
122        (if error 
123            (reject-input file (if (listp error) error (list error)))
124            (multiple-value-bind (valid invalid)
125                (validate table)
126              (if invalid 
127                  (progn 
128                    (log "Rejecting ~A because of invalid rows." file)
129                    (reject-input file invalid))
130                  (let ((incoming 
131                         (make-pathname :defaults *queue* 
132                                        :name (pathname-name file)
133                                        :type (pathname-type file))))
134                    (log "Inserting ~A." incoming)
135                    (rename-file file incoming))))))))))
136
Note: See TracBrowser for help on using the repository browser.