ETL第二篇 調(diào)用webservice
前言
這里使用ETL [Java代碼] 實現(xiàn)
代碼中使用axis調(diào)用webservice
在ETL提供了 Processor類中直接寫業(yè)務(wù)代碼, 而且是每次讀取一行數(shù)據(jù)

jar包準(zhǔn)備
將需要用到的jar包提前放到data-integration/lib 或 data-integration/libswt/對應(yīng)的目錄下
我這里為了方便, 將需要的包發(fā)到了data-integration/libswt/win64/下

用到的相關(guān)包: 鏈接: https://pan.baidu.com/s/1vpUppn-57sc9z3tFGux3uA 密碼: 3phi
代碼示例
這里提供一個簡單的示例供參考
import java.net.URL;
import javax.xml.namespace.QName;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.axis.client.Call;
import org.apache.axis.client.Service;
public boolean processRow(StepMetaInterface smi,StepDataInterface sdi) throws KettleException{
Object[] r = getRow(); // 獲取輸入, 這里獲取流中的一行數(shù)據(jù)
if (r == null) {
setOutputDone(); // 結(jié)束輸出, 說明流中沒有數(shù)據(jù)了
return false;
}
// 獲取字段, 從r中獲取需要的字段
String pk_corp = get(Fields.In, "pk_corp").getString(r);
// 因為我這里需要給WS接口發(fā)送的數(shù)據(jù)是[{"pk_corp":pk_corp},{}...]
JSONArray syncDatas = new JSONArray();
JSONObject syncData = new JSONObject();
syncData.put("pk_corp",pk_corp);
syncDatas.add(syncData);
// 輸出數(shù)組, 會輸出到輸出流
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
JSONObject result = null;
try {
// 輸出日志,也是ETL提供的方法
logBasic("【請求數(shù)據(jù)】" + syncDatas.toString());
// 調(diào)用ws接口
result = syncToNC(syncDatas.toString());
logBasic("【結(jié)果】" + result.toString());
} catch (Exception e) {
logError("【捕獲異常】" + e.getMessage());
// 這里獲取流中的字段并賦值, syncSign和errorLog是我在ETL[字段]中自定義的
get(Fields.Out, "syncSign").setValue( outputRow, '0' );
get(Fields.Out, "errorLog").setValue( outputRow, e.getMessage());
// 輸出數(shù)據(jù)傳輸?shù)较乱徊? 下一步接收的數(shù)據(jù)就會有我在[字段中]添加的字段
putRow(data.outputRowMeta, outputRow);
return true;
}
if( result.containsKey("code") && result.getInt("code") == 0) {
JSONArray resultDatas = result.getJSONArray("data");
JSONObject resultData = resultDatas.getJSONObject(0);
// 輸出字段
get(Fields.Out, "syncSign").setValue( outputRow, '1' );
get(Fields.Out, "newNCPK").setValue( outputRow, resultData.getString("ncPK"));
} else {
// 輸出字段
get(Fields.Out, "syncSign").setValue( outputRow, '2' );
get(Fields.Out, "errorLog").setValue( outputRow, result.getString("msg"));
}
putRow(data.outputRowMeta, outputRow);
return true;
}
// 調(diào)用webservice
private static JSONObject syncToNC(String str) throws Exception {
JSONObject jsonResult = null;
// 接口地址 ( 這里后面不加"?wsdl"
String endpoint = "http://localhost:8090/uapws/service/nc.ift.hs.ydrs.IAddStapplybService";
// targetNamespace
String targetNamespace = "http://ydrs.hs.ift.nc/IAddStapplybService";
// 調(diào)用接口中的方法
String operationName = "addStapplyb";
String result = null;
String message = null;
// 接口方法名
Service service = new Service();
Call call = (Call) service.createCall();
// 設(shè)置webservice地址
call.setTargetEndpointAddress(new URL(endpoint));
// 發(fā)布的方法名
call.setOperationName(new QName(targetNamespace, operationName));
// 設(shè)置參數(shù)
call.addParameter("string", org.apache.axis.encoding.XMLType.XSD_STRING, javax.xml.rpc.ParameterMode.IN);
// 返回類型
call.setReturnType(org.apache.axis.encoding.XMLType.XSD_STRING);
call.setEncodingStyle("utf-8");
call.setUseSOAPAction(true);
call.setSOAPActionURI(targetNamespace + operationName);
// 設(shè)置參數(shù)組
Object[] params = new Object[] { str };
// 調(diào)用接口
result = (String) call.invoke(params);
// 處理結(jié)果
if (result != null && ! "".equals(result)) {
jsonResult = JSONObject.fromObject(result);
}
return jsonResult;
}
ETL中使用Java代碼

自定義字段


浙公網(wǎng)安備 33010602011771號