#include "RS-MySQL.h"
/*
* RS_MySQL_dbApply.
*
* R/S: dbApply(rs, INDEX, FUN, group.begin, group.end, end, ...)
*
* This first implementation of R's dbApply()
* extracts rows from an open result set rs and applies functions
* to those rows of each group. This is how it works: it keeps tracks of
* the values of the field pointed by "group" and it identifies events:
* BEGIN_GROUP (just read the first row of a different group),
* NEW_RECORD (every record fetched generates this event),
* and END_GROUP (just finished with the current group). At these points
* we invoke the R functions group.end() and group.begin() in the
* environment() of dbApply
* [should it be the environment where dbApply was called from (i.e.,
* dbApply's parent's * frame)?]
* Except for the very first group, the order of invocation is
* end.group() followed by begin.group()
*
* NOTE: We're thinking of groups as commonly defined in awk scripts
* (but also in SAP's ABAP/4) were rows are assumed to be sorted by
* the "group" fields and we detect a different (new) group when any of
* the "group" fields changes. Our implementation does not require
* the result set to be sorted by group, but for performance-sake,
* it better be.
*
* TODO: 1. Notify the reason for exiting (normal, exhausted maxBatches, etc.)
* 2. Allow INDEX to be a list, as in tapply().
* 3. Handle NA's (SQL NULL's) in the INDEX and/or data fields.
* Currently they are ignored, thus effectively causing a
* new BEGIN_GROUP event.
* 4. Re-write fetch() in terms of events (END_OF_DATA,
* EXHAUST_DATAFRAME, DB_ERROR, etc.)
* 5. Create a table of R callback functions indexed by events,
* then a handle_event() could conveniently handle all the events.
*/
SEXP expand_list(SEXP old, int new_len);
void add_group(SEXP group_names, SEXP data,
SEXPTYPE *fld_Sclass, int group,
int ngroup, int i);
unsigned int check_groupEvents(SEXP data, SEXPTYPE fld_Sclass[],
int row, int col);
/* The following are the masks for the events/states we recognize as we
* bring rows from the result set/cursor
*/
#define NEVER 0
#define BEGIN 1 /* prior to reading 1st row from the resultset */
#define END 2 /* after reading last row from the result set */
#define BEGIN_GROUP 4 /* just read in 1'st row for a different group */
#define END_GROUP 8 /* just read the last row of the current group */
#define NEW_RECORD 16 /* uninteresting ... */
#define PARTIAL_GROUP 32 /* too much data (>max_rex) partial buffer */
/* the following are non-grouping events (e.g., db errors, memory) */
#define EXHAUSTED_DF 64 /* exhausted the allocated data.frame */
#define EXHAUSTED_OUT 128 /* exhausted the allocated output list */
#define END_OF_DATA 256 /* end of data from the result set */
#define DBMS_ERROR 512 /* error in remote dbms */
/* beginGroupFun takes only one arg: the name of the current group */
SEXP
RS_DBI_invokeBeginGroup(SEXP callObj, /* should be initialized */
const char *group_name, /* one string */
SEXP rho)
{
SEXP s_group_name;
/* make a copy of the argument */
PROTECT(s_group_name = NEW_CHARACTER((int) 1));
SET_CHR_EL(s_group_name, 0, mkChar(group_name));
/* and stick into call object */
SETCADR(callObj, s_group_name);
eval(callObj, rho);
UNPROTECT(1);
return R_NilValue;
}
SEXP
RS_DBI_invokeNewRecord(SEXP callObj, /* should be initialized already */
SEXP new_record,/* a 1-row data.frame */
SEXP rho)
{
SEXP df;
/* make a copy of the argument */
PROTECT(df = duplicate(new_record));
/* and stick it into the call object */
SETCADR(callObj, df);
eval(callObj, rho);
UNPROTECT(1);
return R_NilValue;
}
/* endGroupFun takes two args: a data.frame and the group name */
SEXP
RS_DBI_invokeEndGroup(SEXP callObj, SEXP data,
const char *group_name, SEXP rho)
{
SEXP s_x, s_group_name, val;
/* make copies of the arguments */
PROTECT(callObj = duplicate(callObj));
PROTECT(s_x = duplicate(data));
PROTECT(s_group_name = NEW_CHARACTER((int) 1));
SET_CHR_EL(s_group_name, 0, mkChar(group_name));
/* stick copies of args into the call object */
SETCADR(callObj, s_x);
SETCADDR(callObj, s_group_name);
SETCADDDR(callObj, R_DotsSymbol);
val = eval(callObj, rho);
UNPROTECT(3);
return val;
}
SEXP /* output is a named list */
RS_MySQL_dbApply(SEXP rsHandle, /* resultset handle */
SEXP s_group_field,/* this is a 0-based field number */
SEXP s_funs, /* a 5-elem list with handler funs */
SEXP rho, /* the env where to run funs */
SEXP s_batch_size, /* alloc these many rows */
SEXP s_max_rec) /* max rows per group */
{
RS_DBI_resultSet *result;
RMySQLFields* flds;
MYSQL_RES *my_result;
MYSQL_ROW row;
SEXP data, cur_rec, out_list, group_names, val;
unsigned long *lens = (unsigned long *)0;
SEXPTYPE *fld_Sclass;
int i, j, null_item, expand, completed;
int num_rec, num_groups;
int num_fields;
int max_rec = INT_EL(s_max_rec,0); /* max rec per group */
int ngroup = 0, group_field = INT_EL(s_group_field,0);
long total_records;
int pushed_back = FALSE;
unsigned int event = NEVER;
int np = 0; /* keeps track of PROTECT()'s */
SEXP beginGroupCall, beginGroupFun = LST_EL(s_funs, 2);
SEXP endGroupCall, endGroupFun = LST_EL(s_funs, 3);
SEXP newRecordCall, newRecordFun = LST_EL(s_funs, 4);
int invoke_beginGroup = (GET_LENGTH(beginGroupFun)>0);
int invoke_endGroup = (GET_LENGTH(endGroupFun)>0);
int invoke_newRecord = (GET_LENGTH(newRecordFun)>0);
row = NULL;
beginGroupCall = R_NilValue; /* -Wall */
if(invoke_beginGroup){
PROTECT(beginGroupCall=lang2(beginGroupFun, R_NilValue));
++np;
}
endGroupCall = R_NilValue; /* -Wall */
if(invoke_endGroup){
/* TODO: append list(...) to the call object */
PROTECT(endGroupCall = lang4(endGroupFun, R_NilValue,
R_NilValue, R_NilValue));
++np;
}
newRecordCall = R_NilValue; /* -Wall */
if(invoke_newRecord){
PROTECT(newRecordCall = lang2(newRecordFun, R_NilValue));
++np;
}
result = RS_DBI_getResultSet(rsHandle);
flds = result->fields;
if(!flds)
error("corrupt resultSet, missing fieldDescription");
num_fields = flds->num_fields;
fld_Sclass = flds->Sclass;
PROTECT(data = NEW_LIST((int) num_fields)); /* buffer records */
PROTECT(cur_rec = NEW_LIST((int) num_fields)); /* current record */
np += 2;
RS_DBI_allocOutput(cur_rec, flds, (int) 1, 0);
make_data_frame(cur_rec);
num_rec = INT_EL(s_batch_size, 0); /* this is num of rec per group! */
max_rec = INT_EL(s_max_rec,0); /* max rec **per group** */
num_groups = num_rec;
PROTECT(out_list = NEW_LIST(num_groups));
PROTECT(group_names = NEW_CHARACTER(num_groups));
np += 2;
/* set conversion for group names */
if(result->rowCount==0){
event = BEGIN;
/* here we could invoke the begin function*/
}
/* actual fetching.... */
my_result = (MYSQL_RES *) result->drvResultSet;
completed = (int) 0;
total_records = 0;
expand = 0; /* expand or init each data vector? */
i = 0; /* index into row number **within** groups */
while(1){
if(i==0 || i==num_rec){ /* BEGIN, EXTEND_DATA, BEGIN_GROUP */
/* reset num_rec upon a new group, double it if needs to expand */
num_rec = (i==0) ? INT_EL(s_batch_si
评论0