1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.myfaces.component.html.util;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.myfaces.component.html.util.StreamingAddResource.StreamablePositionedInfo;
28
29 public class StreamingThreadManager
30 {
31 public final static String KEY = "org.apache.myfaces.component.html.util.StreamingThreadManager";
32
33
34
35
36 private final Map headerInfos = new HashMap();
37
38 private Thread cleanupThread = null;
39
40 private CleanupThread cleanupThreadObject = null;
41
42
43
44
45 private volatile long REQUEST_ID_COUNTER = 0;
46
47 public StreamingThreadManager()
48 {
49 }
50
51 public void init()
52 {
53 if (cleanupThread == null)
54 {
55 cleanupThreadObject = new CleanupThread();
56 cleanupThread = new Thread(cleanupThreadObject, "StreamingAddResource.CleanupThread");
57 cleanupThread.setDaemon(true);
58 cleanupThread.start();
59 }
60 }
61
62 public void destroy()
63 {
64 if (cleanupThread != null)
65 {
66 cleanupThreadObject.done();
67 cleanupThread.interrupt();
68 cleanupThread = null;
69 synchronized (headerInfos)
70 {
71 headerInfos.clear();
72 }
73 }
74 }
75
76 public HeaderInfoEntry getHeaderInfo(Long requestId)
77 {
78 synchronized (headerInfos)
79 {
80 return (HeaderInfoEntry) headerInfos.get(requestId);
81 }
82 }
83
84 public Long putNewHeaderInfoEntry()
85 {
86 Long requestId = null;
87 synchronized(this)
88 {
89 REQUEST_ID_COUNTER++;
90 requestId = new Long(REQUEST_ID_COUNTER);
91 }
92 HeaderInfoEntry headerInfoEntry = new HeaderInfoEntry();
93 synchronized (headerInfos)
94 {
95 headerInfos.put(requestId, headerInfoEntry);
96 }
97 return requestId;
98 }
99
100 public void removeHeaderInfo(Long requestId)
101 {
102 synchronized (headerInfos)
103 {
104 headerInfos.remove(requestId);
105 }
106 }
107
108 public static class HeaderInfoEntry
109 {
110 private final long destroyTime = System.currentTimeMillis() + (1000 * 60);
111 private final List addedInfos = new ArrayList(10);
112 private volatile boolean requestDone = false;
113
114 protected HeaderInfoEntry()
115 {
116 }
117
118 protected boolean isDestroyable(long now)
119 {
120 return destroyTime < now;
121 }
122
123 protected void addInfo(StreamablePositionedInfo positionedInfo)
124 {
125 synchronized (addedInfos)
126 {
127 addedInfos.add(positionedInfo);
128 addedInfos.notifyAll();
129 }
130 }
131
132 protected StreamablePositionedInfo fetchInfo() throws InterruptedException
133 {
134 synchronized (addedInfos)
135 {
136 while (addedInfos.size() < 1 && !requestDone)
137 {
138 addedInfos.wait(100);
139 }
140 if (addedInfos.size() < 1)
141 {
142
143 return null;
144 }
145
146 return (StreamablePositionedInfo) addedInfos.remove(0);
147 }
148 }
149
150 protected void setRequestDone()
151 {
152 requestDone = true;
153 }
154 }
155
156 private class CleanupThread implements Runnable
157 {
158
159 private final static int CHECKS_PER_RUN = 10;
160
161
162 private final static int CACHE_LIMIT = 1000;
163
164 private boolean threadDone = false;
165
166 public void run()
167 {
168 while (!Thread.interrupted() && !threadDone)
169 {
170 checkMap();
171
172 try
173 {
174 Thread.sleep(1000 * 30);
175 }
176 catch (InterruptedException e)
177 {
178
179 }
180 }
181 }
182
183 public void done() {
184 threadDone = true;
185 }
186
187 private void checkMap()
188 {
189 synchronized (headerInfos)
190 {
191 long now = System.currentTimeMillis();
192
193 int checkNo = 0;
194 Iterator iterEntries = headerInfos.entrySet().iterator();
195 while (iterEntries.hasNext() && !Thread.currentThread().isInterrupted())
196 {
197 checkNo++;
198 if (headerInfos.size() < CACHE_LIMIT && checkNo > CHECKS_PER_RUN)
199 {
200 return;
201 }
202 Map.Entry entry = (Map.Entry) iterEntries.next();
203 HeaderInfoEntry headerInfoEntry = (HeaderInfoEntry) entry.getValue();
204 if (headerInfoEntry.isDestroyable(now))
205 {
206 iterEntries.remove();
207 }
208 }
209 }
210 }
211 }
212 }