(ld blobserver.Loader, conf jsonconfig.Obj)
| 132 | var validateOnStartDefault, _ = strconv.ParseBool(os.Getenv("CAMLI_SYNC_VALIDATE")) |
| 133 | |
| 134 | func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) { |
| 135 | var ( |
| 136 | from = conf.RequiredString("from") |
| 137 | to = conf.RequiredString("to") |
| 138 | fullSync = conf.OptionalBool("fullSyncOnStart", false) |
| 139 | blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false) |
| 140 | idle = conf.OptionalBool("idle", false) |
| 141 | queueConf = conf.OptionalObject("queue") |
| 142 | copierPoolSize = conf.OptionalInt("copierPoolSize", 5) |
| 143 | validate = conf.OptionalBool("validateOnStart", validateOnStartDefault) |
| 144 | hourlyCompare = conf.OptionalInt("hourlyCompareBytes", 0) |
| 145 | ) |
| 146 | if err := conf.Validate(); err != nil { |
| 147 | return nil, err |
| 148 | } |
| 149 | if idle { |
| 150 | return newIdleSyncHandler(from, to), nil |
| 151 | } |
| 152 | if len(queueConf) == 0 { |
| 153 | return nil, errors.New(`Missing required "queue" object`) |
| 154 | } |
| 155 | q, err := sorted.NewKeyValueMaybeWipe(queueConf) |
| 156 | if err != nil { |
| 157 | return nil, err |
| 158 | } |
| 159 | |
| 160 | isToIndex := false |
| 161 | fromBs, err := ld.GetStorage(from) |
| 162 | if err != nil { |
| 163 | return nil, err |
| 164 | } |
| 165 | toBs, err := ld.GetStorage(to) |
| 166 | if err != nil { |
| 167 | return nil, err |
| 168 | } |
| 169 | if _, ok := fromBs.(*index.Index); !ok { |
| 170 | if _, ok := toBs.(*index.Index); ok { |
| 171 | isToIndex = true |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | sh := newSyncHandler(from, to, fromBs, toBs, q) |
| 176 | sh.toIndex = isToIndex |
| 177 | sh.copierPoolSize = copierPoolSize |
| 178 | if err := sh.readQueueToMemory(); err != nil { |
| 179 | return nil, fmt.Errorf("Error reading sync queue to memory: %v", err) |
| 180 | } |
| 181 | |
| 182 | if fullSync || blockFullSync { |
| 183 | sh.logf("Doing full sync") |
| 184 | didFullSync := make(chan bool, 1) |
| 185 | go func() { |
| 186 | for { |
| 187 | n := sh.runSync("pending blobs queue", sh.enumeratePendingBlobs) |
| 188 | if n > 0 { |
| 189 | sh.logf("Queue sync copied %d blobs", n) |
| 190 | continue |
| 191 | } |
nothing calls this directly
no test coverage detected