實現隊列處理數據
需求描述:已有的數據接收接口為單線程,由于數據量激增壓力過大,改進成項目啟動時創建兩個新線程,一個接收數據直接存入隊列,另一個處理隊列中的數據,沒有時等待。
實現過程中遇到的問題寫在前面。
- Java靜態方法中依賴注入調用Service層問題。
@Autowired
private GatedateManager GatedateService;
@Autowired
private YjzjbkManager YjzjbkService ;
public static QueueTest QueueTest;
@PostConstruct
public void init(){
QueueTest=this;
QueueTest.GatedateService=this.GatedateService;
QueueTest.YjzjbkService=this.YjzjbkService;
}
PageData station = QueueTest.GatedateService.findByStation(pd);// 調用時
問題處理思路:
1. 我們創建一個監聽的類,繼承ServletContextListener,用于項目啟動時執行。
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import com.ycgis.controller.alertinfo.alertinfo.QueueTest.Input;
import com.ycgis.controller.alertinfo.alertinfo.QueueTest.Test14;
public class MyListener implements ServletContextListener {
private Input myThread;
private Test14 myThread1;
@Override
public void contextDestroyed(ServletContextEvent arg0) {
if(myThread != null && myThread.isInterrupted()) {
myThread.interrupt();
}
}
@Override
public void contextInitialized(ServletContextEvent arg0) {
String str = null;
if (str == null && myThread == null) {
// myThread = new Input();
myThread1 = new Test14();
// myThread.start(); // servlet 上下文初始化時啟動 socket
myThread1.start();
}
}
}
2.自定義一個 Class 線程類,并初始化隊列。
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.commons.lang.StringEscapeUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class QueueTest{
//@Resource(name="GatedateService")
@Autowired
private GatedateManager GatedateService;
@Autowired
private YjzjbkManager YjzjbkService ;
public static QueueTest QueueTest;
@PostConstruct
public void init(){
QueueTest=this;
QueueTest.GatedateService=this.GatedateService;
QueueTest.YjzjbkService=this.YjzjbkService;
}
public static final Queue<String> queue = new LinkedBlockingQueue();
public static String qtype = "1";
static class Input extends Thread{// 接收控制臺輸入
public void run(){
// String name = null;
// Scanner sc = new Scanner(System.in);
// System.out.println("請輸入字符串:");
while(true){
// name=sc.nextLine();
//// // 如果立即可行且不違反容量限制,
//// // 則將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),
//// // 并在成功時返回 true;如果當前沒有空間可用,則返回 false
// queue.offer(name);
// if ("exit".equals(name))
// break;
if(queue.size()>0){
synchronized (queue){//notify()是Object()中定義的方法所以只能用在synchronized()方法中。
queue.notify();//喚醒在負責輸出線程中的等待的告訴隊列中有元素了它可以輸出了
System.err.println("我是喚醒:當前隊列中有新增內容!");
}
}
}
}
}
static class Test14 extends Thread{
public void run(){
while(true){
if(queue.size()>0){
String name = queue.poll();
JSONArray arr = JSONArray.fromObject(name);
try{
for(int i=0;i<arr.size();i++){
PageData pd = new PageData();
PageData pdts = new PageData();
PageData pdqc = new PageData();//去重
JSONObject job = arr.getJSONObject(i);
if(job!=null){
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
pd.put("ID",UuidUtil.get32UUID());
pd.put("TYPE","");
}
}
}catch (Exception e){
e.printStackTrace();
}
}else{
synchronized (queue){
try {
queue.wait();//相當于queue.wait(0),隊列中沒有東西則默認無限等待直到隊列中有東西并且通知他
}catch (InterruptedException e) {
e.printStackTrace();
}//如果隊列中沒有東西則等待
}
}
}
}
}
}
3. 配置web監聽
<listener>
<listener-class>com.aaaa.controller.alertinfo.alertinfo.MyListener</listener-class>
</listener>
4. 正式接口來數據時的調用。
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(value="/appgate")
public class IntGateinfoController extends BaseController {
@RequestMapping(value="/getgate")
@ResponseBody
public Object getAlert(@RequestBody String format,String token,String uname) throws Exception{
logBefore(logger, "獲取到json數據");
Map<String,Object> map = new HashMap<String,Object>();
String ret = "";
PageData kp = new PageData();
kp.put("TO_KEN", token);
kp.put("KEYNAME", uname);
kp.put("FORMAT", format);
PageData ibs = new PageData();
ibs.put("KEYNAME", "yjryxx");
try{
if(ibs!=null && MD5.md5(ibs.getString("KEYNAME")).equals(uname)){
if(AppUtil.checkParam("getCheckPerson", kp)){
String str = QueueTest.qtype;
if(str.equals("1")){
ret = "01";
QueueTest.queue.offer(format);
synchronized (QueueTest.queue){
QueueTest.queue.notify();
}
}else{
ret="00";
}
}else {
ret = "03";
}
}else{
ret = "05";
}
}catch (Exception e){
logger.error(e.toString(), e);
ret="00";
}finally{
map.put("result", ret);
logAfter(logger);
}
return AppUtil.returnObject(new PageData(), map);
}
}
浙公網安備 33010602011771號