本人有一存储过程,在非集群环境下可以跑出结果表来,但是集群环境下,结果表为空。
存储过程的语句为:
CREATE  PROCEDURE `PRO_TREND_EXTRACT`(IN IN_TBNAME VARCHAR(30),
IN IN_FOCUSFIELD VARCHAR(100), # 连续字段:BAL余额(只能选择一个)
IN IN_FIELD VARCHAR(2000), # 展示的字段
IN IN_WHERE VARCHAR(2000), # 筛选条件
IN IN_ORDERBY VARCHAR(2000),# 排序字段(可以选择升序或降序)
IN IN_GROUPBY VARCHAR(100), # 分组字段
IN IN_TREND VARCHAR(5), # 趋势 (ASC或DESC)
IN IN_TRENDSTEP VARCHAR(20), # 连续步长 100 
IN IN_RESULTTABLE VARCHAR(100), # 结果表 ZM_10049_M2476_2_M
IN IN_TABLESPACE VARCHAR(100),
IN IN_INDEXSPACE VARCHAR(100),
OUT OUT_CODE INTEGER,
OUT OUT_MSG VARCHAR(8000))
BEGIN
/*
存储过程名称:PRO_TREND_EXTRACT
存储过程中文:严格连续趋势提取
存储过程功能:提取按指定字段分组排序后具有严格连续增加或减小趋势的数据
*/
DECLARE SQLCODE INTEGER DEFAULT 0;
DECLARE GROUPBYSTR INTEGER;
DECLARE FOCUSFIELD VARCHAR(100);
DECLARE TMP_GRDATA INTEGER;
DECLARE TMP_FSDATA VARCHAR(100);
DECLARE TMP_CTDATA VARCHAR(100);
DECLARE FLAG INTEGER;
DECLARE COUNT INTEGER;  
DECLARE OPR VARCHAR(1);
DECLARE NUMSTR VARCHAR(8000) DEFAULT '';
DECLARE V_SQL VARCHAR(8000);# 开始=====================================================================
DECLARE COL_NUMS INTEGER DEFAULT 0; # 排序字段个数
DECLARE RANKS VARCHAR(2000) DEFAULT ''; # 拼接多个排序字段
DECLARE ZIDUAN VARCHAR(2000) DEFAULT '';# 拼接多个显示字段
DECLARE COL_NAME VARCHAR(100); # 单个排序字段
DECLARE COUNT_WXF INTEGER DEFAULT 0; # 计数使用的
DECLARE DONE INT DEFAULT FALSE; # 游标结束标志(TRUE表示游标结束)
DECLARE CUR CURSOR FOR SELECT FIELD FROM TREND_EXTRACT_GROUPBY_TMP;
# 结束=====================================================================

DECLARE TRD_TASK CURSOR FOR SELECT FOUSFIELD,COMPAREFIELD,GROUPFLAG FROM TREND_EXTRACT_OVER_TMP;
DECLARE CONTINUE HANDLER FOR NOT FOUND
BEGIN
SET SQLCODE=0;
SET OUT_CODE=SQLCODE;
SET DONE = TRUE;
END;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION
BEGIN
SET SQLCODE=-1;
SET OUT_CODE=SQLCODE;
SET OUT_MSG = CONCAT('ERROR:',V_SQL);
END;

#create table if not exists wxf_sql_log(field text);# 开始=====================================================================
SET COL_NUMS = LENGTH(IN_GROUPBY)-LENGTH(REPLACE(IN_GROUPBY,',',''))+1;
SET @I=1;
  # TREND_EXTRACT_GROUPBY_TMP临时表存放的是拆分后的分组字段
CREATE TABLE IF NOT EXISTS TREND_EXTRACT_GROUPBY_TMP(FIELD text);
DELETE FROM TREND_EXTRACT_GROUPBY_TMP;
COMMIT;
WHILE @I<=COL_NUMS
DO
INSERT INTO TREND_EXTRACT_GROUPBY_TMP VALUES(SUBSTRING_INDEX(SUBSTRING_INDEX(IN_GROUPBY,',',@I),',',-1));
SET @I=@I+1;
END WHILE; OPEN CUR;
SET COUNT_WXF=0;
READ_LOOP:LOOP
FETCH CUR INTO COL_NAME;
IF DONE THEN
LEAVE READ_LOOP;
END IF;
SET COUNT_WXF:=COUNT_WXF+1;
IF COUNT_WXF=1 THEN
SET RANKS=CONCAT(RANKS,'@',COL_NAME,'=',COL_NAME);
ELSE
SET RANKS=CONCAT(RANKS,' AND ','@',COL_NAME,'=',COL_NAME);
END IF;
SET ZIDUAN=CONCAT(ZIDUAN,',@',COL_NAME,':=',COL_NAME,' AS ',COL_NAME,'1'); SET V_SQL=CONCAT('SET ','@',COL_NAME,'=null');
  SET @V_SQL=V_SQL;
PREPARE STMT FROM @V_SQL;
EXECUTE STMT;
DEALLOCATE PREPARE STMT; END LOOP;
CLOSE CUR;
SET @ROWNUM=0,@RANK=0;
# 结束=====================================================================
  # TREND_EXTRACT_OVER_TMP表存放的是分组排序后的数据
DROP TABLE IF EXISTS TREND_EXTRACT_OVER_TMP;
IF SQLCODE=0 THEN
SET V_SQL=CONCAT('CREATE TABLE IF NOT EXISTS ',IN_RESULTTABLE,' AS ( SELECT ',IN_FIELD,' FROM ',IN_TBNAME,' WHERE 1=2)');
SET @V_SQL=V_SQL;
PREPARE STMT FROM @V_SQL;
EXECUTE STMT;
DEALLOCATE PREPARE STMT;
END IF;

SET GROUPBYSTR=1;
SET FLAG=0; #组标识,为零标识新组
SET COUNT = 0;#组内计数器
SET NUMSTR='0';

#记录扫描(加入组标识)
IF UCASE(IN_TREND)='ASC' OR UCASE(IN_TREND)='ASCEND' THEN
SET OPR='-';
ELSE
SET OPR='+';
END IF; IF SQLCODE=0 THEN
  SET V_SQL=CONCAT('CREATE TABLE TREND_EXTRACT_OVER_TMP AS SELECT ',IN_FIELD,',',IN_FOCUSFIELD,' AS FOUSFIELD,',IN_FOCUSFIELD,OPR,IN_TRENDSTEP,' AS COMPAREFIELD,@ROWNUM:=@ROWNUM+1 AS ROWNUM,@RANK:=IF(',RANKS,',@RANK,@ROWNUM) AS GROUPFLAG',ZIDUAN,' FROM ',IN_TBNAME,' WHERE ',IN_WHERE,' ORDER BY ',IN_ORDERBY,',',IN_GROUPBY);
SET @V_SQL = V_SQL;
PREPARE STMT FROM @V_SQL;
EXECUTE STMT;
DEALLOCATE PREPARE STMT;
#insert into wxf_sql_log values(v_sql);
END IF; SET DONE=FALSE; #游标开始前,置成FALSE.
IF SQLCODE=0 THEN
OPEN TRD_TASK;
F_LOOP1:
LOOP
FETCH TRD_TASK INTO TMP_FSDATA,TMP_CTDATA,TMP_GRDATA;
IF DONE THEN
LEAVE F_LOOP1;
END IF;
#若换组,则进行上组扫描数据处理,若为满足要求的组,记录组号
IF GROUPBYSTR <> TMP_GRDATA THEN -- 表示开始读取新的一组数据
 IF COUNT>1 THEN  -- 如果count>1 ,说明 上一组一直复合连续条件,记录上一组的groupflag值。
SET NUMSTR=CONCAT(NUMSTR,',',GROUPBYSTR);
 END IF;
 SET FLAG=1; -- 设置 FLAG=1 表示 新的一组数据开始了  
ELSE
 SET FLAG=FLAG+1; -- 读取得这行数据跟上一行同一组。
END IF;

IF FLAG=1 THEN  -- 为1 表示新组,不需要比较
 SET COUNT=1; -- 新组 设置计数为1
ELSE
 IF COUNT>0 THEN
  IF UCASE(IN_TREND)='ASC' OR UCASE(IN_TREND)='ASCEND' THEN # 注意,持续增长和持续下跌的判断条件不同
IF IN_TRENDSTEP='0' THEN
IF TMP_CTDATA > FOCUSFIELD THEN
SET COUNT = COUNT + 1;
ELSE
SET COUNT=0;
END IF;
ELSE
IF TMP_CTDATA >= FOCUSFIELD THEN
SET COUNT = COUNT + 1;
ELSE
SET COUNT=0;
END IF;
END IF;
  ELSE 
   IF IN_TRENDSTEP='0' THEN
IF TMP_CTDATA < FOCUSFIELD THEN
SET COUNT = COUNT + 1;  
ELSE
SET COUNT=0;
END IF;
ELSE
IF TMP_CTDATA <= FOCUSFIELD THEN
SET COUNT = COUNT + 1;
ELSE
SET COUNT=0;
END IF;
END IF;
  END IF;
 END IF;  
END IF;

SET FOCUSFIELD=TMP_FSDATA;
SET GROUPBYSTR=TMP_GRDATA;

END LOOP F_LOOP1;
CLOSE TRD_TASK;
END IF; 
 
#insert into wxf_sql_log values(22222222);
#insert into wxf_sql_log values(SQLCODE);
#insert into wxf_sql_log values(COUNT);
IF SQLCODE=0 THEN
IF COUNT>1 THEN  -- 当游标读取完数据时,判断逻辑就不走了。如果COUNT=1,表示最后一条数据是新组,如果=0,表示上一条数据所在组不符合条件,如果>1,表示上一组数据符合趋势条件,应该提取,所以加到NUMSTR中。
SET NUMSTR=CONCAT(NUMSTR,',',GROUPBYSTR);
#insert into wxf_sql_log values('111111111');
#insert into wxf_sql_log values(NUMSTR);
END IF;
IF NUMSTR<>'0' THEN  -- 也就是说有复合条件的数据
SET V_SQL=CONCAT('INSERT INTO ',IN_RESULTTABLE,' SELECT distinct ',IN_FIELD,' FROM TREND_EXTRACT_OVER_TMP WHERE GROUPFLAG IN(',NUMSTR,')');
SET @V_SQL=V_SQL;
PREPARE STMT FROM @V_SQL;
EXECUTE STMT;
DEALLOCATE PREPARE STMT;
#insert into wxf_sql_log values(V_SQL);
END IF;
END IF; IF SQLCODE=0 THEN
SET OUT_CODE=SQLCODE;
SET OUT_MSG = 'SUCCESS!';
END IF;END调用的语句为:CALL PRO_TREND_EXTRACT (
'DFI_20150629115303765',
'CODE2',
'CODE1,CODE2,CODE3,CODE4',
'1=1',
'CODE2 ASC',
'CODE1',
'ASC',
'1',
'ZM_42518_M3631_1_M',
'UIA_TBS',
'UIA_IND' ,@1 ,@2
);用到的表为:CREATE TABLE `dfi_20150629115303765` (
  `BATCH_NO` varchar(32) DEFAULT NULL,
  `CODE1` varchar(255) DEFAULT NULL,
  `CODE2` varchar(255) DEFAULT NULL,
  `CODE3` varchar(255) DEFAULT NULL,
  `CODE4` varchar(255) DEFAULT NULL,
  `IS_SUM` varchar(40) DEFAULT NULL
) ENGINE=ndbcluster DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of dfi_20150629115303765
-- ----------------------------
INSERT INTO `dfi_20150629115303765` VALUES ('6b59d00100ac467b911bad878e3d4ef5', '1', '19', '3', '啊啊', '0');
INSERT INTO `dfi_20150629115303765` VALUES ('6b59d00100ac467b911bad878e3d4ef9', '3', '23', '3', '啊啊', '0');
INSERT INTO `dfi_20150629115303765` VALUES ('6b59d00100ac467b911bad878e3d4ef7', '3', '21', '3', '啊啊', '0');
INSERT INTO `dfi_20150629115303765` VALUES ('6b59d00100ac467b911bad878e3d4ef6', '2', '20', '3', '啊啊', '0');
INSERT INTO `dfi_20150629115303765` VALUES ('6b59d00100ac467b911bad878e3d4ef8', '3', '22', '3', '啊啊', '0');