NewDataFlowTester create a *DataFlowTester to help developer test their subtasks data flow
(t *testing.T, pluginName string, pluginMeta plugin.PluginMeta)
| 100 | |
| 101 | // NewDataFlowTester create a *DataFlowTester to help developer test their subtasks data flow |
| 102 | func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta plugin.PluginMeta) *DataFlowTester { |
| 103 | err := plugin.RegisterPlugin(pluginName, pluginMeta) |
| 104 | if err != nil { |
| 105 | panic(err) |
| 106 | } |
| 107 | cfg := config.GetConfig() |
| 108 | e2eDbUrl := cfg.GetString(`E2E_DB_URL`) |
| 109 | if e2eDbUrl == `` { |
| 110 | panic(errors.Default.New(`e2e can only run with E2E_DB_URL, please set it in environment variable or .env file`)) |
| 111 | } |
| 112 | cfg.Set(`DB_URL`, cfg.GetString(`E2E_DB_URL`)) |
| 113 | db, err := runner.NewGormDb(cfg, logruslog.Global) |
| 114 | if err != nil { |
| 115 | // if here fail with error `access denied for user` you need to create database by your self as follow command |
| 116 | // create databases lake_test; |
| 117 | // grant all on lake_test.* to 'merico'@'%'; |
| 118 | panic(err) |
| 119 | } |
| 120 | errors.Must(db.AutoMigrate(&models.SubtaskState{})) |
| 121 | df := &DataFlowTester{ |
| 122 | Cfg: cfg, |
| 123 | Db: db, |
| 124 | Dal: dalgorm.NewDalgorm(db), |
| 125 | T: t, |
| 126 | Name: pluginName, |
| 127 | Plugin: pluginMeta, |
| 128 | Log: logruslog.Global, |
| 129 | } |
| 130 | return df |
| 131 | } |
| 132 | |
| 133 | // ImportCsvIntoRawTable imports records from specified csv file into target raw table, note that existing data would be deleted first. |
| 134 | func (t *DataFlowTester) ImportCsvIntoRawTable(csvRelPath string, rawTableName string) { |