* Analyze a data pipeline, add needed operators.
(data, scope, ops)
| 148747 | /** |
| 148748 | * Analyze a data pipeline, add needed operators. |
| 148749 | */ function analyze(data, scope, ops) { |
| 148750 | const output = []; |
| 148751 | let source = null, modify = false, generate = false, upstream, i, n, t, m; |
| 148752 | if (data.values) { |
| 148753 | // hard-wired input data set |
| 148754 | if (isSignal(data.values) || hasSignal(data.format)) { |
| 148755 | // if either values is signal or format has signal, use dynamic loader |
| 148756 | output.push(load(scope, data)); |
| 148757 | output.push(source = collect()); |
| 148758 | } else // otherwise, ingest upon dataflow init |
| 148759 | output.push(source = collect({ |
| 148760 | $ingest: data.values, |
| 148761 | $format: data.format |
| 148762 | })); |
| 148763 | } else if (data.url) { |
| 148764 | // load data from external source |
| 148765 | if (hasSignal(data.url) || hasSignal(data.format)) { |
| 148766 | // if either url or format has signal, use dynamic loader |
| 148767 | output.push(load(scope, data)); |
| 148768 | output.push(source = collect()); |
| 148769 | } else // otherwise, request load upon dataflow init |
| 148770 | output.push(source = collect({ |
| 148771 | $request: data.url, |
| 148772 | $format: data.format |
| 148773 | })); |
| 148774 | } else if (data.source) { |
| 148775 | // derives from one or more other data sets |
| 148776 | source = upstream = (0, _vegaUtil.array)(data.source).map((d)=>ref(scope.getData(d).output)); |
| 148777 | output.push(null); // populate later |
| 148778 | } // scan data transforms, add collectors as needed |
| 148779 | for(i = 0, n = ops.length; i < n; ++i){ |
| 148780 | t = ops[i]; |
| 148781 | m = t.metadata; |
| 148782 | if (!source && !m.source) output.push(source = collect()); |
| 148783 | output.push(t); |
| 148784 | if (m.generates) generate = true; |
| 148785 | if (m.modifies && !generate) modify = true; |
| 148786 | if (m.source) source = t; |
| 148787 | else if (m.changes) source = null; |
| 148788 | } |
| 148789 | if (upstream) { |
| 148790 | n = upstream.length - 1; |
| 148791 | output[0] = Relay({ |
| 148792 | derive: modify, |
| 148793 | pulse: n ? upstream : upstream[0] |
| 148794 | }); |
| 148795 | if (modify || n) // collect derived and multi-pulse tuples |
| 148796 | output.splice(1, 0, collect()); |
| 148797 | } |
| 148798 | if (!source) output.push(collect()); |
| 148799 | output.push(Sieve({})); |
| 148800 | return output; |
| 148801 | } |
| 148802 | function collect(values) { |
| 148803 | const s = Collect({}, values); |
| 148804 | s.metadata = { |