(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr)
| 1807 | } |
| 1808 | |
| 1809 | func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr) (*logclient.LogClient, error) { |
| 1810 | var err error |
| 1811 | keepaliveCfg := GetKeepalive(&cfg.Config) |
| 1812 | keepaliveCfg.PermitWithoutStream = true |
| 1813 | client := logclient.NewLogClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) |
| 1814 | |
| 1815 | err = client.Init(ctx, g, mgr.GetStorage()) |
| 1816 | if err != nil { |
| 1817 | return nil, errors.Trace(err) |
| 1818 | } |
| 1819 | defer func() { |
| 1820 | if err != nil { |
| 1821 | client.Close(ctx) |
| 1822 | } |
| 1823 | }() |
| 1824 | |
| 1825 | u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) |
| 1826 | if err != nil { |
| 1827 | return nil, errors.Trace(err) |
| 1828 | } |
| 1829 | |
| 1830 | opts := getExternalStorageOptions(&cfg.Config, u) |
| 1831 | if err = client.SetStorage(ctx, u, &opts); err != nil { |
| 1832 | return nil, errors.Trace(err) |
| 1833 | } |
| 1834 | client.SetCrypter(&cfg.CipherInfo) |
| 1835 | client.SetUpstreamClusterID(cfg.UpstreamClusterID) |
| 1836 | |
| 1837 | err = client.InitClients(ctx, u, cfg.logCheckpointMetaManager, cfg.sstCheckpointMetaManager, uint(cfg.PitrConcurrency), cfg.ConcurrencyPerStore.Value) |
| 1838 | if err != nil { |
| 1839 | return nil, errors.Trace(err) |
| 1840 | } |
| 1841 | |
| 1842 | err = client.SetRawKVBatchClient(ctx, cfg.PD, cfg.TLS.ToKVSecurity()) |
| 1843 | if err != nil { |
| 1844 | return nil, errors.Trace(err) |
| 1845 | } |
| 1846 | |
| 1847 | encryptionManager, err := encryption.NewManager(&cfg.LogBackupCipherInfo, &cfg.MasterKeyConfig) |
| 1848 | if err != nil { |
| 1849 | return nil, errors.Annotate(err, "failed to create encryption manager for log restore") |
| 1850 | } |
| 1851 | if err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize, encryptionManager); err != nil { |
| 1852 | return nil, errors.Trace(err) |
| 1853 | } |
| 1854 | |
| 1855 | client.SetRestoreID(cfg.RestoreID) |
| 1856 | |
| 1857 | return client, nil |
| 1858 | } |
| 1859 | |
| 1860 | // rangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder |
| 1861 | // TODO: need to implement the range filter out feature |
no test coverage detected