From afdccecc4f1c2e02c9805fbf489997fb574f8239 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 13 Aug 2021 21:29:39 +0530 Subject: [PATCH 01/14] parallel decoding --- supervisor flow defined [wip] --- matrix/parallel_decoder_state.go | 153 +++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 matrix/parallel_decoder_state.go diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go new file mode 100644 index 0000000..848755f --- /dev/null +++ b/matrix/parallel_decoder_state.go @@ -0,0 +1,153 @@ +package matrix + +import ( + "context" + + "github.com/cloud9-tools/go-galoisfield" + "github.com/itzmeanjan/kodr" +) + +type OP uint8 + +const ( + SUB_AFTER_MULT OP = iota + 1 + DIVISION +) + +type ParallelDecoderState struct { + field *galoisfield.GF + // this is generation size = G + pieceCount uint + // #-of pieces received already + receivedCount uint + coeffs, coded Matrix + workCount uint + workerQueue []*work + workerCount uint + workerChans []chan struct{} + supervisorChan chan uint +} + +type work struct { + // 0-based work index which monotonically increases + idx uint + srcRow, dstRow uint + // weight is element of coefficient + // matrix i.e. field element + weight byte + // any of two possible row operations + op OP +} + +type workerState struct { + workerChan chan struct{} + decoderState *ParallelDecoderState + currentWorkIdx uint + totalWorkCount uint + columnStart, columnEnd uint +} + +func (p *ParallelDecoderState) createWork(src, dst uint, weight byte, op OP) { + w := work{srcRow: src, dstRow: dst, weight: weight, op: op} + p.workerQueue = append(p.workerQueue, &w) +} + +func (p *ParallelDecoderState) supervise(ctx context.Context) { + var ( + linearlyDependentPieceCount uint = 0 + ) + +OUT: + for { + select { + case <-ctx.Done(): + break OUT + + case idx := <-p.supervisorChan: + // useful when linearly dependent pieces are received + idx -= linearlyDependentPieceCount + + // --- Stage A begins --- + for j := uint(0); j < idx; j++ { + p.createWork(j, idx, p.coeffs[idx][j], SUB_AFTER_MULT) + } + + for j := uint(0); j < idx; j++ { + weight := p.coeffs[idx][j] + p.coeffs[idx][j] = 0 + + for k := j; k < p.pieceCount; k++ { + tmp := p.field.Mul(p.coeffs[j][k], weight) + p.coeffs[idx][k] = p.field.Add(p.coeffs[idx][k], tmp) + } + } + // --- Stage A ends --- + + // --- Stage B begins --- + // first column index for row `idx` + // which has non-zero field element + non_zero_idx := -1 + for j := idx; j < p.pieceCount; j++ { + if p.coeffs[idx][j] != 0 { + non_zero_idx = int(j) + break + } + } + + // if no element is found to be non-zero, + // it's a linearly dependent piece --- not useful + if non_zero_idx == -1 { + linearlyDependentPieceCount += 1 + + p.coeffs[idx] = nil + copy((p.coeffs)[idx:], (p.coeffs)[idx+1:]) + p.coeffs = (p.coeffs)[:len(p.coeffs)-1] + + continue OUT + } + // --- Stage B ends --- + + // --- Stage C begins --- + p.createWork(idx, idx, p.coeffs[idx][non_zero_idx], DIVISION) + + for k := uint(non_zero_idx); k < p.pieceCount; k++ { + p.coeffs[idx][k] = p.field.Div(p.coeffs[idx][k], p.coeffs[idx][non_zero_idx]) + } + // --- Stage C ends --- + + // --- Stage D begins --- + for j := uint(0); j < idx; j++ { + p.createWork(idx, j, p.coeffs[j][non_zero_idx], SUB_AFTER_MULT) + } + + for j := uint(0); j < idx; j++ { + weight := p.coeffs[j][non_zero_idx] + p.coeffs[j][non_zero_idx] = 0 + + for k := uint(non_zero_idx); k < p.pieceCount; k++ { + tmp := p.field.Mul(p.coeffs[idx][k], weight) + p.coeffs[j][k] = p.field.Add(p.coeffs[j][k], tmp) + } + } + // --- Stage D ends --- + } + } +} + +func (p *ParallelDecoderState) AddPiece(coding_vector kodr.CodingVector, coded_data kodr.Piece) { + p.coeffs = append(p.coeffs, coding_vector) + p.coded = append(p.coded, coded_data) + p.receivedCount += 1 + + // supervisor should start working only when atleast + // 2 coded pieces are received + if p.receivedCount < 2 { + return + } + + // it's blocking call, if chan is non-bufferred ! + // lets supervisor know coded piece index to work on + // + // -1 added due to 0 based indexing + p.supervisorChan <- p.receivedCount - 1 +} From 3194d2e27aa6fc05b6a290cabac5e321e7f92b84 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 08:46:42 +0530 Subject: [PATCH 02/14] worker flow --- matrix/parallel_decoder_state.go | 129 +++++++++++++++++++++++-------- 1 file changed, 96 insertions(+), 33 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 848755f..f8e5a94 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -12,6 +12,8 @@ type OP uint8 const ( SUB_AFTER_MULT OP = iota + 1 DIVISION + // special signal denoting workers should stop working + STOP ) type ParallelDecoderState struct { @@ -19,13 +21,14 @@ type ParallelDecoderState struct { // this is generation size = G pieceCount uint // #-of pieces received already - receivedCount uint + receivedCount uint + // useful piece count i.e. linearly + // independent pieces decoder has received + useful uint coeffs, coded Matrix - workCount uint workerQueue []*work - workerCount uint - workerChans []chan struct{} - supervisorChan chan uint + workerChans []chan uint64 + supervisorChan chan *kodr.CodedPiece } type work struct { @@ -40,7 +43,7 @@ type work struct { } type workerState struct { - workerChan chan struct{} + workerChan chan uint64 decoderState *ParallelDecoderState currentWorkIdx uint totalWorkCount uint @@ -50,42 +53,49 @@ type workerState struct { func (p *ParallelDecoderState) createWork(src, dst uint, weight byte, op OP) { w := work{srcRow: src, dstRow: dst, weight: weight, op: op} p.workerQueue = append(p.workerQueue, &w) + idx := uint(len(p.workerQueue) - 1) + + for i := 0; i < len(p.workerChans); i++ { + // it's blocking call, better to use buffered channel, + // then it won't probably be ! + p.workerChans[i] <- uint64(idx) + } } func (p *ParallelDecoderState) supervise(ctx context.Context) { - var ( - linearlyDependentPieceCount uint = 0 - ) - OUT: for { select { case <-ctx.Done(): break OUT - case idx := <-p.supervisorChan: - // useful when linearly dependent pieces are received - idx -= linearlyDependentPieceCount - - // --- Stage A begins --- - for j := uint(0); j < idx; j++ { - p.createWork(j, idx, p.coeffs[idx][j], SUB_AFTER_MULT) + case codedPiece := <-p.supervisorChan: + // done with decoding, no need to work + // on new coded piece ! + if p.useful >= p.pieceCount { + continue OUT } + p.coeffs = append(p.coeffs, codedPiece.Vector) + p.coded = append(p.coded, codedPiece.Piece) + + // index of current piece of interest + idx := uint(len(p.coeffs) - 1) + + // --- Stage A begins --- for j := uint(0); j < idx; j++ { weight := p.coeffs[idx][j] - p.coeffs[idx][j] = 0 - for k := j; k < p.pieceCount; k++ { + for k := j + 1; k < p.pieceCount; k++ { tmp := p.field.Mul(p.coeffs[j][k], weight) p.coeffs[idx][k] = p.field.Add(p.coeffs[idx][k], tmp) } } - // --- Stage A ends --- // --- Stage B begins --- // first column index for row `idx` // which has non-zero field element + // after `idx-1` column non_zero_idx := -1 for j := idx; j < p.pieceCount; j++ { if p.coeffs[idx][j] != 0 { @@ -97,16 +107,26 @@ OUT: // if no element is found to be non-zero, // it's a linearly dependent piece --- not useful if non_zero_idx == -1 { - linearlyDependentPieceCount += 1 - p.coeffs[idx] = nil copy((p.coeffs)[idx:], (p.coeffs)[idx+1:]) p.coeffs = (p.coeffs)[:len(p.coeffs)-1] + p.coded[idx] = nil + copy((p.coded)[idx:], (p.coded)[idx+1:]) + p.coded = (p.coded)[:len(p.coded)-1] + + p.useful = uint(len(p.coeffs)) continue OUT } // --- Stage B ends --- + for j := uint(0); j < idx; j++ { + weight := p.coeffs[idx][j] + p.coeffs[idx][j] = 0 + p.createWork(j, idx, weight, SUB_AFTER_MULT) + } + // --- Stage A ends --- + // --- Stage C begins --- p.createWork(idx, idx, p.coeffs[idx][non_zero_idx], DIVISION) @@ -130,24 +150,67 @@ OUT: } } // --- Stage D ends --- + + // these many useful pieces decoder has as of now + p.useful = uint(len(p.coeffs)) + + // because decoding is complete ! + // workers doesn't need to be alive ! + if p.useful >= p.pieceCount { + p.createWork(0, 0, 0, STOP) + } + } } } -func (p *ParallelDecoderState) AddPiece(coding_vector kodr.CodingVector, coded_data kodr.Piece) { - p.coeffs = append(p.coeffs, coding_vector) - p.coded = append(p.coded, coded_data) - p.receivedCount += 1 +func (p *ParallelDecoderState) work(ctx context.Context, wState *workerState) { +OUT: + for { + select { + case <-ctx.Done(): + break OUT + + case idx := <-wState.workerChan: + w := p.workerQueue[idx] - // supervisor should start working only when atleast - // 2 coded pieces are received - if p.receivedCount < 2 { - return + switch w.op { + case SUB_AFTER_MULT: + for i := wState.columnStart; i <= wState.columnEnd; i++ { + tmp := p.field.Mul(p.coded[w.srcRow][i], w.weight) + p.coded[w.dstRow][i] = p.field.Add(p.coded[w.dstRow][i], tmp) + } + + case DIVISION: + for i := wState.columnStart; i <= wState.columnEnd; i++ { + p.coded[w.dstRow][i] = p.field.Add(p.coded[w.srcRow][i], w.weight) + } + + case STOP: + // supervisor signals decoding is complete ! + break OUT + + } + } } +} +// Adds new coded piece to decoder state, so that it can process +// and progressively decoded pieces +// +// Before invoking this method, it's good idea to check +// `IsDecoded` method & refrain from invoking if already +// decoded +func (p *ParallelDecoderState) AddPiece(codedPiece *kodr.CodedPiece) { // it's blocking call, if chan is non-bufferred ! - // lets supervisor know coded piece index to work on // - // -1 added due to 0 based indexing - p.supervisorChan <- p.receivedCount - 1 + // better to use buffered channel + p.supervisorChan <- codedPiece +} + +// If enough #-of linearly independent pieces are received +// whole data is decoded, which denotes it's good time +// to start consuming ! +func (p *ParallelDecoderState) IsDecoded() bool { + return p.useful >= p.pieceCount } From 6eec5b6cc39c2b96cad9a9f0386d5b143402cf2b Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 09:28:51 +0530 Subject: [PATCH 03/14] get piece by index --- matrix/parallel_decoder_state.go | 130 ++++++++++++++++++++++++------- 1 file changed, 102 insertions(+), 28 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index f8e5a94..6917995 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -2,6 +2,7 @@ package matrix import ( "context" + "sync/atomic" "github.com/cloud9-tools/go-galoisfield" "github.com/itzmeanjan/kodr" @@ -19,22 +20,30 @@ const ( type ParallelDecoderState struct { field *galoisfield.GF // this is generation size = G - pieceCount uint + pieceCount uint64 // #-of pieces received already - receivedCount uint + receivedCount uint64 // useful piece count i.e. linearly // independent pieces decoder has received - useful uint - coeffs, coded Matrix - workerQueue []*work - workerChans []chan uint64 - supervisorChan chan *kodr.CodedPiece + useful uint64 + coeffs, coded Matrix + decoded bool + workerQueue []*work + workerChans []chan uint64 + supervisorAddPieceChan chan *kodr.CodedPiece + supervisorGetPieceChan chan *pieceRequest +} + +type pieceRequest struct { + idx uint64 + resp chan *kodr.Piece + err chan error } type work struct { - // 0-based work index which monotonically increases - idx uint - srcRow, dstRow uint + // which two rows of coded data matrix are involved + // in this row operation + srcRow, dstRow uint64 // weight is element of coefficient // matrix i.e. field element weight byte @@ -44,13 +53,10 @@ type work struct { type workerState struct { workerChan chan uint64 - decoderState *ParallelDecoderState - currentWorkIdx uint - totalWorkCount uint columnStart, columnEnd uint } -func (p *ParallelDecoderState) createWork(src, dst uint, weight byte, op OP) { +func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { w := work{srcRow: src, dstRow: dst, weight: weight, op: op} p.workerQueue = append(p.workerQueue, &w) idx := uint(len(p.workerQueue) - 1) @@ -69,10 +75,10 @@ OUT: case <-ctx.Done(): break OUT - case codedPiece := <-p.supervisorChan: + case codedPiece := <-p.supervisorAddPieceChan: // done with decoding, no need to work // on new coded piece ! - if p.useful >= p.pieceCount { + if p.IsDecoded() { continue OUT } @@ -80,10 +86,10 @@ OUT: p.coded = append(p.coded, codedPiece.Piece) // index of current piece of interest - idx := uint(len(p.coeffs) - 1) + idx := uint64(len(p.coeffs) - 1) // --- Stage A begins --- - for j := uint(0); j < idx; j++ { + for j := uint64(0); j < idx; j++ { weight := p.coeffs[idx][j] for k := j + 1; k < p.pieceCount; k++ { @@ -115,12 +121,12 @@ OUT: copy((p.coded)[idx:], (p.coded)[idx+1:]) p.coded = (p.coded)[:len(p.coded)-1] - p.useful = uint(len(p.coeffs)) + atomic.StoreUint64(&p.useful, uint64(p.coeffs.Rows())) continue OUT } // --- Stage B ends --- - for j := uint(0); j < idx; j++ { + for j := uint64(0); j < idx; j++ { weight := p.coeffs[idx][j] p.coeffs[idx][j] = 0 p.createWork(j, idx, weight, SUB_AFTER_MULT) @@ -130,21 +136,21 @@ OUT: // --- Stage C begins --- p.createWork(idx, idx, p.coeffs[idx][non_zero_idx], DIVISION) - for k := uint(non_zero_idx); k < p.pieceCount; k++ { + for k := uint64(non_zero_idx); k < p.pieceCount; k++ { p.coeffs[idx][k] = p.field.Div(p.coeffs[idx][k], p.coeffs[idx][non_zero_idx]) } // --- Stage C ends --- // --- Stage D begins --- - for j := uint(0); j < idx; j++ { + for j := uint64(0); j < idx; j++ { p.createWork(idx, j, p.coeffs[j][non_zero_idx], SUB_AFTER_MULT) } - for j := uint(0); j < idx; j++ { + for j := uint64(0); j < idx; j++ { weight := p.coeffs[j][non_zero_idx] p.coeffs[j][non_zero_idx] = 0 - for k := uint(non_zero_idx); k < p.pieceCount; k++ { + for k := uint64(non_zero_idx); k < p.pieceCount; k++ { tmp := p.field.Mul(p.coeffs[idx][k], weight) p.coeffs[j][k] = p.field.Add(p.coeffs[j][k], tmp) } @@ -152,14 +158,59 @@ OUT: // --- Stage D ends --- // these many useful pieces decoder has as of now - p.useful = uint(len(p.coeffs)) + atomic.StoreUint64(&p.useful, uint64(p.coeffs.Rows())) // because decoding is complete ! // workers doesn't need to be alive ! - if p.useful >= p.pieceCount { + if p.IsDecoded() { p.createWork(0, 0, 0, STOP) } + case req := <-p.supervisorGetPieceChan: + if req.idx >= p.pieceCount { + req.err <- kodr.ErrPieceOutOfBound + continue OUT + } + + if req.idx >= uint64(p.coeffs.Rows()) { + req.err <- kodr.ErrPieceNotDecodedYet + continue OUT + } + + if p.IsDecoded() { + req.resp <- (*kodr.Piece)(&p.coded[req.idx]) + continue OUT + } + + cols := uint64(p.coeffs.Cols()) + decoded := true + + NESTED: + for i := uint64(0); i < cols; i++ { + switch i { + case req.idx: + if p.coeffs[req.idx][i] != 1 { + decoded = false + break NESTED + } + + default: + if p.coeffs[req.idx][i] != 0 { + decoded = false + break NESTED + } + + } + } + + if !decoded { + req.err <- kodr.ErrPieceNotDecodedYet + continue OUT + } + + req.resp <- (*kodr.Piece)(&p.coded[req.idx]) + continue OUT + } } } @@ -201,16 +252,39 @@ OUT: // Before invoking this method, it's good idea to check // `IsDecoded` method & refrain from invoking if already // decoded +// +// It's concurrent safe ! func (p *ParallelDecoderState) AddPiece(codedPiece *kodr.CodedPiece) { // it's blocking call, if chan is non-bufferred ! // // better to use buffered channel - p.supervisorChan <- codedPiece + p.supervisorAddPieceChan <- codedPiece } // If enough #-of linearly independent pieces are received // whole data is decoded, which denotes it's good time // to start consuming ! +// +// It's concurrent safe ! func (p *ParallelDecoderState) IsDecoded() bool { - return p.useful >= p.pieceCount + return atomic.LoadUint64(&p.useful) >= p.pieceCount +} + +// Fetch decoded piece by index, can also return piece when not fully +// decoded, given requested piece is decoded +func (p *ParallelDecoderState) GetPiece(idx uint64) (kodr.Piece, error) { + respChan := make(chan *kodr.Piece, 1) + errChan := make(chan error, 1) + req := pieceRequest{idx: idx, resp: respChan, err: errChan} + + // this may block ! + p.supervisorGetPieceChan <- &req + + // waiting for response ! + select { + case err := <-errChan: + return nil, err + case piece := <-respChan: + return *piece, nil + } } From 3e1af9c563b309cbd808013f29f7ee3a95722cbb Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 11:41:56 +0530 Subject: [PATCH 04/14] new parallel decoder state --- matrix/parallel_decoder_state.go | 98 ++++++++++++++++++++++++++++++-- 1 file changed, 94 insertions(+), 4 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 6917995..bcb1fee 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -2,6 +2,7 @@ package matrix import ( "context" + "runtime" "sync/atomic" "github.com/cloud9-tools/go-galoisfield" @@ -21,19 +22,19 @@ type ParallelDecoderState struct { field *galoisfield.GF // this is generation size = G pieceCount uint64 - // #-of pieces received already - receivedCount uint64 + // length of coded data part of coded piece + pieceLen uint64 // useful piece count i.e. linearly // independent pieces decoder has received useful uint64 coeffs, coded Matrix - decoded bool workerQueue []*work workerChans []chan uint64 supervisorAddPieceChan chan *kodr.CodedPiece supervisorGetPieceChan chan *pieceRequest } +// decoded piece consumption request type pieceRequest struct { idx uint64 resp chan *kodr.Piece @@ -53,7 +54,7 @@ type work struct { type workerState struct { workerChan chan uint64 - columnStart, columnEnd uint + columnStart, columnEnd uint64 } func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { @@ -76,6 +77,10 @@ OUT: break OUT case codedPiece := <-p.supervisorAddPieceChan: + if codedPiece.Len() != uint(p.pieceCount+p.pieceLen) { + continue OUT + } + // done with decoding, no need to work // on new coded piece ! if p.IsDecoded() { @@ -288,3 +293,88 @@ func (p *ParallelDecoderState) GetPiece(idx uint64) (kodr.Piece, error) { return *piece, nil } } + +// Current state of coding coefficient matrix +// +// NOTE: Don't mutate matrix, use only for writing test cases ! +func (p *ParallelDecoderState) CoefficientMatrix() Matrix { + return p.coeffs +} + +// Current state of coded piece matrix, which is updated +// along side coding coefficient matrix ( during parallel rref ) +// +// NOTE: Don't mutate matrix, use only for writing test cases ! +func (p *ParallelDecoderState) CodedPieceMatrix() Matrix { + return p.coded +} + +// Each worker must at least take responsibility of +// 2-bytes slice of coded data & each of these +// worker slices are non-overlapping +func workerCount_(pieceLen uint64) uint64 { + // it's actually double of available CPU count + cpus := uint64(runtime.NumCPU()) << 1 + if pieceLen/cpus > 1 { + return cpus + } + + return cpus >> 1 +} + +// Splitting coded data matrix mutation responsibility among workers +// Each of these slices are non-overlapping +// +// workerCount = 0 denotes user wants to go with `kodr` chosen value +func splitWork(workerCount, pieceLen uint64) []*workerState { + if workerCount == 0 { + workerCount = workerCount_(pieceLen) + } + + span := pieceLen / workerCount + workers := make([]*workerState, 0, workerCount) + for i := uint64(0); i < workerCount; i++ { + start := span * i + end := span*(i+1) - 1 + if i == workerCount-1 { + end = pieceLen - 1 + } + + ws := workerState{ + workerChan: make(chan uint64, workerCount), + columnStart: start, + columnEnd: end, + } + workers = append(workers, &ws) + } + return workers +} + +func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) *ParallelDecoderState { + splitted := splitWork(0, pieceLen) + + dec := ParallelDecoderState{ + field: galoisfield.DefaultGF256, + pieceCount: pieceCount, + pieceLen: pieceLen, + coeffs: make([][]byte, 0, pieceCount), + coded: make([][]byte, 0, pieceCount), + workerQueue: make([]*work, 0), + supervisorAddPieceChan: make(chan *kodr.CodedPiece, pieceCount), + supervisorGetPieceChan: make(chan *pieceRequest, 1), + } + + workerChans := make([]chan uint64, 0, len(splitted)) + for i := 0; i < len(splitted); i++ { + func(idx int) { + workerChans = append(workerChans, splitted[i].workerChan) + // each worker runs on its own go-routine + go dec.work(ctx, splitted[idx]) + }(i) + } + + dec.workerChans = workerChans + // supervisor runs on its own go-routine + go dec.supervise(ctx) + return &dec +} From a15239cb224fc7c9a67b1ebda4b44c7435b68997 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 11:56:00 +0530 Subject: [PATCH 05/14] worker count <= 2 * #-of available cores on machine --- matrix/parallel_decoder_state.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index bcb1fee..88fcc3f 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -310,38 +310,32 @@ func (p *ParallelDecoderState) CodedPieceMatrix() Matrix { } // Each worker must at least take responsibility of -// 2-bytes slice of coded data & each of these +// 8-bytes slice of coded data & each of these // worker slices are non-overlapping -func workerCount_(pieceLen uint64) uint64 { - // it's actually double of available CPU count +func workerCount(pieceLen uint64) uint64 { + wcount := pieceLen / 8 cpus := uint64(runtime.NumCPU()) << 1 - if pieceLen/cpus > 1 { + if wcount > cpus { return cpus } - - return cpus >> 1 + return wcount } // Splitting coded data matrix mutation responsibility among workers // Each of these slices are non-overlapping -// -// workerCount = 0 denotes user wants to go with `kodr` chosen value -func splitWork(workerCount, pieceLen uint64) []*workerState { - if workerCount == 0 { - workerCount = workerCount_(pieceLen) - } - - span := pieceLen / workerCount - workers := make([]*workerState, 0, workerCount) - for i := uint64(0); i < workerCount; i++ { +func splitWork(pieceLen uint64) []*workerState { + wcount := workerCount(pieceLen) + span := pieceLen / wcount + workers := make([]*workerState, 0, wcount) + for i := uint64(0); i < wcount; i++ { start := span * i end := span*(i+1) - 1 - if i == workerCount-1 { + if i == wcount-1 { end = pieceLen - 1 } ws := workerState{ - workerChan: make(chan uint64, workerCount), + workerChan: make(chan uint64, wcount), columnStart: start, columnEnd: end, } @@ -351,7 +345,7 @@ func splitWork(workerCount, pieceLen uint64) []*workerState { } func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) *ParallelDecoderState { - splitted := splitWork(0, pieceLen) + splitted := splitWork(pieceLen) dec := ParallelDecoderState{ field: galoisfield.DefaultGF256, From 6a428d2a767ea3a8bd7b2fdfe046d432cc6cfa29 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 12:05:13 +0530 Subject: [PATCH 06/14] return error indication when adding coded piece to parallel decoder --- errors.go | 1 + matrix/parallel_decoder_state.go | 29 ++++++++++++++++++++--------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/errors.go b/errors.go index a745361..0137971 100644 --- a/errors.go +++ b/errors.go @@ -14,4 +14,5 @@ var ( ErrCodingVectorLengthMismatch = errors.New("coding vector length > coded piece length ( in total )") ErrPieceNotDecodedYet = errors.New("piece not decoded yet, more pieces required") ErrPieceOutOfBound = errors.New("requested piece index >= pieceCount ( pieces coded together )") + ErrCodedPieceSizeMismatch = errors.New("received coded piece's size != expected size") ) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 88fcc3f..587593d 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -30,10 +30,15 @@ type ParallelDecoderState struct { coeffs, coded Matrix workerQueue []*work workerChans []chan uint64 - supervisorAddPieceChan chan *kodr.CodedPiece + supervisorAddPieceChan chan *addRequest supervisorGetPieceChan chan *pieceRequest } +type addRequest struct { + piece *kodr.CodedPiece + err chan error +} + // decoded piece consumption request type pieceRequest struct { idx uint64 @@ -76,17 +81,22 @@ OUT: case <-ctx.Done(): break OUT - case codedPiece := <-p.supervisorAddPieceChan: - if codedPiece.Len() != uint(p.pieceCount+p.pieceLen) { + case req := <-p.supervisorAddPieceChan: + if req.piece.Len() != uint(p.pieceCount+p.pieceLen) { + req.err <- kodr.ErrCodedPieceSizeMismatch continue OUT } // done with decoding, no need to work // on new coded piece ! if p.IsDecoded() { + req.err <- kodr.ErrAllUsefulPiecesReceived continue OUT } + // piece to be processed further, returning nil error ! + req.err <- nil + codedPiece := req.piece p.coeffs = append(p.coeffs, codedPiece.Vector) p.coded = append(p.coded, codedPiece.Piece) @@ -259,11 +269,12 @@ OUT: // decoded // // It's concurrent safe ! -func (p *ParallelDecoderState) AddPiece(codedPiece *kodr.CodedPiece) { - // it's blocking call, if chan is non-bufferred ! - // - // better to use buffered channel - p.supervisorAddPieceChan <- codedPiece +func (p *ParallelDecoderState) AddPiece(codedPiece *kodr.CodedPiece) error { + errChan := make(chan error, 1) + req := addRequest{piece: codedPiece, err: errChan} + p.supervisorAddPieceChan <- &req + + return <-errChan } // If enough #-of linearly independent pieces are received @@ -354,7 +365,7 @@ func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) * coeffs: make([][]byte, 0, pieceCount), coded: make([][]byte, 0, pieceCount), workerQueue: make([]*work, 0), - supervisorAddPieceChan: make(chan *kodr.CodedPiece, pieceCount), + supervisorAddPieceChan: make(chan *addRequest, pieceCount), supervisorGetPieceChan: make(chan *pieceRequest, 1), } From 1fea7a76995a8f44ff9aec7820786bb7f881f30b Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 12:12:01 +0530 Subject: [PATCH 07/14] received piece count > 2, only then work starts --- matrix/parallel_decoder_state.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 587593d..8d91491 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -75,6 +75,9 @@ func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { } func (p *ParallelDecoderState) supervise(ctx context.Context) { + // how many pieces received + receivedCount := 0 + OUT: for { select { @@ -96,10 +99,16 @@ OUT: // piece to be processed further, returning nil error ! req.err <- nil + receivedCount++ codedPiece := req.piece p.coeffs = append(p.coeffs, codedPiece.Vector) p.coded = append(p.coded, codedPiece.Piece) + // minimum 2 pieces are requyired to start working + if receivedCount < 2 { + continue OUT + } + // index of current piece of interest idx := uint64(len(p.coeffs) - 1) From ffdfb1c6f3e7319822e23af72d39560ba0e0af14 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 15:13:42 +0530 Subject: [PATCH 08/14] fix worker implementation for DIVISION task --- matrix/parallel_decoder_state.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 8d91491..ae0dd4a 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -104,11 +104,6 @@ OUT: p.coeffs = append(p.coeffs, codedPiece.Vector) p.coded = append(p.coded, codedPiece.Piece) - // minimum 2 pieces are requyired to start working - if receivedCount < 2 { - continue OUT - } - // index of current piece of interest idx := uint64(len(p.coeffs) - 1) @@ -158,10 +153,11 @@ OUT: // --- Stage A ends --- // --- Stage C begins --- - p.createWork(idx, idx, p.coeffs[idx][non_zero_idx], DIVISION) + weight := p.coeffs[idx][non_zero_idx] + p.createWork(idx, idx, weight, DIVISION) for k := uint64(non_zero_idx); k < p.pieceCount; k++ { - p.coeffs[idx][k] = p.field.Div(p.coeffs[idx][k], p.coeffs[idx][non_zero_idx]) + p.coeffs[idx][k] = p.field.Div(p.coeffs[idx][k], weight) } // --- Stage C ends --- @@ -174,7 +170,7 @@ OUT: weight := p.coeffs[j][non_zero_idx] p.coeffs[j][non_zero_idx] = 0 - for k := uint64(non_zero_idx); k < p.pieceCount; k++ { + for k := uint64(non_zero_idx + 1); k < p.pieceCount; k++ { tmp := p.field.Mul(p.coeffs[idx][k], weight) p.coeffs[j][k] = p.field.Add(p.coeffs[j][k], tmp) } @@ -258,7 +254,7 @@ OUT: case DIVISION: for i := wState.columnStart; i <= wState.columnEnd; i++ { - p.coded[w.dstRow][i] = p.field.Add(p.coded[w.srcRow][i], w.weight) + p.coded[w.dstRow][i] = p.field.Div(p.coded[w.srcRow][i], w.weight) } case STOP: @@ -329,6 +325,13 @@ func (p *ParallelDecoderState) CodedPieceMatrix() Matrix { return p.coded } +func max(a, b uint64) uint64 { + if a >= b { + return a + } + return b +} + // Each worker must at least take responsibility of // 8-bytes slice of coded data & each of these // worker slices are non-overlapping @@ -338,7 +341,7 @@ func workerCount(pieceLen uint64) uint64 { if wcount > cpus { return cpus } - return wcount + return max(wcount, 1) } // Splitting coded data matrix mutation responsibility among workers From 9d9f39c9c32ed5d19503e917389642ff7f4f8faf Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 18:55:30 +0530 Subject: [PATCH 09/14] supervisor waits for confirmation of workers before deciding whether decoded fully or not --- matrix/parallel_decoder_state.go | 111 +++++++++++++++++++------------ 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index ae0dd4a..1df3e74 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -26,12 +26,15 @@ type ParallelDecoderState struct { pieceLen uint64 // useful piece count i.e. linearly // independent pieces decoder has received - useful uint64 - coeffs, coded Matrix - workerQueue []*work - workerChans []chan uint64 - supervisorAddPieceChan chan *addRequest - supervisorGetPieceChan chan *pieceRequest + useful uint64 + coeffs, coded Matrix + workerQueue []*work + workerChans []chan uint64 + supervisorAddPieceChan chan *addRequest + supervisorGetPieceChan chan *pieceRequest + workerCompletedReportChan chan struct{} + workerCompletedCount uint64 + workerCount uint64 } type addRequest struct { @@ -74,7 +77,9 @@ func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { } } -func (p *ParallelDecoderState) supervise(ctx context.Context) { +func (p *ParallelDecoderState) supervise(ctx context.Context, cnfChan chan struct{}) { + // confirming worker is ready to run ! + cnfChan <- struct{}{} // how many pieces received receivedCount := 0 @@ -92,7 +97,7 @@ OUT: // done with decoding, no need to work // on new coded piece ! - if p.IsDecoded() { + if atomic.LoadUint64(&p.useful) >= p.pieceCount { req.err <- kodr.ErrAllUsefulPiecesReceived continue OUT } @@ -118,20 +123,11 @@ OUT: } // --- Stage B begins --- - // first column index for row `idx` - // which has non-zero field element - // after `idx-1` column - non_zero_idx := -1 - for j := idx; j < p.pieceCount; j++ { - if p.coeffs[idx][j] != 0 { - non_zero_idx = int(j) - break - } - } - - // if no element is found to be non-zero, - // it's a linearly dependent piece --- not useful - if non_zero_idx == -1 { + non_zero_idx := idx + pivot := p.coeffs[idx][non_zero_idx] + // pivot must be non-zero, linear dependency found, + // so discard this piece + if pivot == 0 { p.coeffs[idx] = nil copy((p.coeffs)[idx:], (p.coeffs)[idx+1:]) p.coeffs = (p.coeffs)[:len(p.coeffs)-1] @@ -182,10 +178,17 @@ OUT: // because decoding is complete ! // workers doesn't need to be alive ! - if p.IsDecoded() { + if atomic.LoadUint64(&p.useful) >= p.pieceCount { p.createWork(0, 0, 0, STOP) } + case <-p.workerCompletedReportChan: + // workers must confirm they've completed + // all tasks delegated to them + // + // which finally denotes it's good time to decode ! + atomic.AddUint64(&p.workerCompletedCount, 1) + case req := <-p.supervisorGetPieceChan: if req.idx >= p.pieceCount { req.err <- kodr.ErrPieceOutOfBound @@ -235,7 +238,10 @@ OUT: } } -func (p *ParallelDecoderState) work(ctx context.Context, wState *workerState) { +func (p *ParallelDecoderState) work(ctx context.Context, wState *workerState, cnfChan chan struct{}) { + // confirming worker is ready to run ! + cnfChan <- struct{}{} + OUT: for { select { @@ -259,6 +265,8 @@ OUT: case STOP: // supervisor signals decoding is complete ! + // worker also confirms it's done + p.workerCompletedReportChan <- struct{}{} break OUT } @@ -288,7 +296,7 @@ func (p *ParallelDecoderState) AddPiece(codedPiece *kodr.CodedPiece) error { // // It's concurrent safe ! func (p *ParallelDecoderState) IsDecoded() bool { - return atomic.LoadUint64(&p.useful) >= p.pieceCount + return atomic.LoadUint64(&p.useful) >= p.pieceCount && atomic.LoadUint64(&p.workerCompletedCount) >= p.workerCount } // Fetch decoded piece by index, can also return piece when not fully @@ -333,10 +341,12 @@ func max(a, b uint64) uint64 { } // Each worker must at least take responsibility of -// 8-bytes slice of coded data & each of these +// 32-bytes slice of coded data & each of these // worker slices are non-overlapping +// +// Can allocate at max #-of available CPU * 2 go-routines func workerCount(pieceLen uint64) uint64 { - wcount := pieceLen / 8 + wcount := pieceLen / 1 << 5 cpus := uint64(runtime.NumCPU()) << 1 if wcount > cpus { return cpus @@ -346,7 +356,7 @@ func workerCount(pieceLen uint64) uint64 { // Splitting coded data matrix mutation responsibility among workers // Each of these slices are non-overlapping -func splitWork(pieceLen uint64) []*workerState { +func splitWork(pieceLen, pieceCount uint64) []*workerState { wcount := workerCount(pieceLen) span := pieceLen / wcount workers := make([]*workerState, 0, wcount) @@ -358,7 +368,7 @@ func splitWork(pieceLen uint64) []*workerState { } ws := workerState{ - workerChan: make(chan uint64, wcount), + workerChan: make(chan uint64, pieceCount), columnStart: start, columnEnd: end, } @@ -368,30 +378,47 @@ func splitWork(pieceLen uint64) []*workerState { } func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) *ParallelDecoderState { - splitted := splitWork(pieceLen) + splitted := splitWork(pieceLen, pieceCount) + wc := len(splitted) dec := ParallelDecoderState{ - field: galoisfield.DefaultGF256, - pieceCount: pieceCount, - pieceLen: pieceLen, - coeffs: make([][]byte, 0, pieceCount), - coded: make([][]byte, 0, pieceCount), - workerQueue: make([]*work, 0), - supervisorAddPieceChan: make(chan *addRequest, pieceCount), - supervisorGetPieceChan: make(chan *pieceRequest, 1), + field: galoisfield.DefaultGF256, + pieceCount: pieceCount, + pieceLen: pieceLen, + coeffs: make([][]byte, 0, pieceCount), + coded: make([][]byte, 0, pieceCount), + workerQueue: make([]*work, 0), + supervisorAddPieceChan: make(chan *addRequest, pieceCount), + supervisorGetPieceChan: make(chan *pieceRequest, 1), + workerCompletedReportChan: make(chan struct{}, wc), + workerCount: uint64(wc), } - workerChans := make([]chan uint64, 0, len(splitted)) - for i := 0; i < len(splitted); i++ { + // wc + 1 because those many go-routines to be + // run for decoding i.e. (1 supervisor + wc-workers) + cnfChan := make(chan struct{}, wc+1) + + workerChans := make([]chan uint64, 0, wc) + for i := 0; i < wc; i++ { func(idx int) { workerChans = append(workerChans, splitted[i].workerChan) // each worker runs on its own go-routine - go dec.work(ctx, splitted[idx]) + go dec.work(ctx, splitted[idx], cnfChan) }(i) } dec.workerChans = workerChans // supervisor runs on its own go-routine - go dec.supervise(ctx) + go dec.supervise(ctx, cnfChan) + + // wait for all components to start working ! + running := 0 + for range cnfChan { + running++ + if running >= wc+1 { + break + } + } + return &dec } From cd192ed24d36c848e4db97bb0317e6c305464010 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 19:38:45 +0530 Subject: [PATCH 10/14] pass as `slice`, `*slice` not needed --- matrix/parallel_decoder_state.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 1df3e74..1e27833 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -45,7 +45,7 @@ type addRequest struct { // decoded piece consumption request type pieceRequest struct { idx uint64 - resp chan *kodr.Piece + resp chan kodr.Piece err chan error } @@ -201,7 +201,7 @@ OUT: } if p.IsDecoded() { - req.resp <- (*kodr.Piece)(&p.coded[req.idx]) + req.resp <- p.coded[req.idx] continue OUT } @@ -231,7 +231,7 @@ OUT: continue OUT } - req.resp <- (*kodr.Piece)(&p.coded[req.idx]) + req.resp <- p.coded[req.idx] continue OUT } @@ -302,7 +302,7 @@ func (p *ParallelDecoderState) IsDecoded() bool { // Fetch decoded piece by index, can also return piece when not fully // decoded, given requested piece is decoded func (p *ParallelDecoderState) GetPiece(idx uint64) (kodr.Piece, error) { - respChan := make(chan *kodr.Piece, 1) + respChan := make(chan kodr.Piece, 1) errChan := make(chan error, 1) req := pieceRequest{idx: idx, resp: respChan, err: errChan} @@ -314,7 +314,7 @@ func (p *ParallelDecoderState) GetPiece(idx uint64) (kodr.Piece, error) { case err := <-errChan: return nil, err case piece := <-respChan: - return *piece, nil + return piece, nil } } From b0e8ab09ee360b838084b12f364ff4be6e57f50d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 14 Aug 2021 19:39:25 +0530 Subject: [PATCH 11/14] test case for parallel decoder --- matrix/parallel_decoder_state_test.go | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 matrix/parallel_decoder_state_test.go diff --git a/matrix/parallel_decoder_state_test.go b/matrix/parallel_decoder_state_test.go new file mode 100644 index 0000000..0069bc3 --- /dev/null +++ b/matrix/parallel_decoder_state_test.go @@ -0,0 +1,75 @@ +package matrix_test + +import ( + "bytes" + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/itzmeanjan/kodr" + "github.com/itzmeanjan/kodr/full" + "github.com/itzmeanjan/kodr/matrix" +) + +// Generates `N`-bytes of random data from default +// randomization source +func generateData(n uint) []byte { + data := make([]byte, n) + // can safely ignore error + rand.Read(data) + return data +} + +// Generates N-many pieces each of M-bytes length, to be used +// for testing purposes +func generatePieces(pieceCount uint, pieceLength uint) []kodr.Piece { + pieces := make([]kodr.Piece, 0, pieceCount) + for i := 0; i < int(pieceCount); i++ { + pieces = append(pieces, generateData(pieceLength)) + } + return pieces +} + +func TestParallelDecoderState(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + var ( + pieceCount uint64 = 1 << 8 + pieceLen uint64 = 1 << 12 + ) + + original_pieces := generatePieces(uint(pieceCount), uint(pieceLen)) + enc := full.NewFullRLNCEncoder(original_pieces) + + ctx := context.Background() + dec_state := matrix.NewParallelDecoderState(ctx, pieceCount, pieceLen) + + start := time.Now() + for !dec_state.IsDecoded() { + c_piece := enc.CodedPiece() + // simulate pieces being dropped ! + if rand.Intn(2) == 0 { + continue + } + + if err := dec_state.AddPiece(c_piece); err != nil && errors.Is(err, kodr.ErrAllUsefulPiecesReceived) { + break + } + } + + t.Logf("decoding completed in %s\n", time.Since(start)) + t.Logf("crunched %d bytes of data\n", enc.DecodableLen()) + + for i := uint64(0); i < pieceCount; i++ { + d_piece, err := dec_state.GetPiece(i) + if err != nil { + t.Fatalf("Error: %s\n", err.Error()) + } + + if !bytes.Equal(original_pieces[i], d_piece) { + t.Logf("decoded one doesn't match with original one for piece %d\n", i) + } + } +} From 5ff95c5d8721cf022e94c023364731292444e725 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Wed, 18 Aug 2021 21:15:29 +0530 Subject: [PATCH 12/14] benchmark parallel progressive rlnc decoder --- matrix/parallel_decoder_state_bench_test.go | 184 ++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 matrix/parallel_decoder_state_bench_test.go diff --git a/matrix/parallel_decoder_state_bench_test.go b/matrix/parallel_decoder_state_bench_test.go new file mode 100644 index 0000000..807647f --- /dev/null +++ b/matrix/parallel_decoder_state_bench_test.go @@ -0,0 +1,184 @@ +package matrix_test + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/itzmeanjan/kodr" + "github.com/itzmeanjan/kodr/full" + "github.com/itzmeanjan/kodr/matrix" +) + +func copyCodedPieces(c_pieces []*kodr.CodedPiece) []*kodr.CodedPiece { + copied := make([]*kodr.CodedPiece, 0, len(c_pieces)) + + for i := 0; i < len(c_pieces); i++ { + v_len := len(c_pieces[i].Vector) + flat := c_pieces[i].Flatten() + _piece := kodr.CodedPiece{Vector: flat[:v_len], Piece: flat[v_len:]} + copied = append(copied, &_piece) + } + + return copied +} + +func try_decode(b *testing.B, pieceCount, pieceLen uint64, coded_pieces []*kodr.CodedPiece) { + ctx := context.Background() + dec_state := matrix.NewParallelDecoderState(ctx, pieceCount, pieceLen) + + for idx := uint(0); ; idx++ { + if err := dec_state.AddPiece(coded_pieces[idx]); errors.Is(err, kodr.ErrAllUsefulPiecesReceived) { + break + } + } +} + +func decoder_flow(b *testing.B, pieceCount, pieceLen, codedPieceCount uint64) { + original_pieces := generatePieces(uint(pieceCount), uint(pieceLen)) + enc := full.NewFullRLNCEncoder(original_pieces) + + coded_pieces := make([]*kodr.CodedPiece, 0, codedPieceCount) + for i := uint64(0); i < codedPieceCount; i++ { + coded_pieces = append(coded_pieces, enc.CodedPiece()) + } + + b.ResetTimer() + b.SetBytes(int64(pieceCount+pieceLen) * int64(pieceCount)) + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // don't record these allocations + // + // because these are not necessarily + // required to be captured for this benchmark ! + b.StopTimer() + copied := copyCodedPieces(coded_pieces) + b.StartTimer() + + // record this ! + try_decode(b, pieceCount, pieceLen, copied) + } +} + +func BenchmarkParallelDecoderState(b *testing.B) { + rand.Seed(time.Now().UnixNano()) + + var ( + extraPieceCount uint64 = 1 << 4 + ) + + b.Run("16Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 4 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) + + b.Run("32Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 5 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) + + b.Run("64Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 6 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) + + b.Run("128Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 7 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) + + b.Run("256Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 8 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) + + b.Run("512Pieces", func(b *testing.B) { + var ( + pieceCount uint64 = 1 << 9 + codedPieceCount uint64 = pieceCount + extraPieceCount + ) + + b.Run("each_1kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<10, codedPieceCount) }) + b.Run("each_2kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<11, codedPieceCount) }) + b.Run("each_4kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<12, codedPieceCount) }) + b.Run("each_8kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<13, codedPieceCount) }) + b.Run("each_16kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<14, codedPieceCount) }) + b.Run("each_32kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<15, codedPieceCount) }) + b.Run("each_64kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<16, codedPieceCount) }) + b.Run("each_128kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<17, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_256kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<18, codedPieceCount) }) + b.Run("each_512kB", func(b *testing.B) { decoder_flow(b, pieceCount, 1<<19, codedPieceCount) }) + }) +} From d8f3c36313552e728d8737a030dcd710d88bb994 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 20 Aug 2021 12:43:44 +0530 Subject: [PATCH 13/14] use r/wmutex when attempting to mutate coded data matrix from multiple go-routines --- matrix/parallel_decoder_state.go | 39 +++++++++++++++++++++++---- matrix/parallel_decoder_state_test.go | 8 ++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 1e27833..5a9ddff 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -3,6 +3,7 @@ package matrix import ( "context" "runtime" + "sync" "sync/atomic" "github.com/cloud9-tools/go-galoisfield" @@ -26,8 +27,11 @@ type ParallelDecoderState struct { pieceLen uint64 // useful piece count i.e. linearly // independent pieces decoder has received - useful uint64 - coeffs, coded Matrix + useful uint64 + coeffs, coded Matrix + // because competing go-routines attempt to + // mutate `coded` data matrix + lockCoded *sync.RWMutex workerQueue []*work workerChans []chan uint64 supervisorAddPieceChan chan *addRequest @@ -80,8 +84,6 @@ func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { func (p *ParallelDecoderState) supervise(ctx context.Context, cnfChan chan struct{}) { // confirming worker is ready to run ! cnfChan <- struct{}{} - // how many pieces received - receivedCount := 0 OUT: for { @@ -104,10 +106,18 @@ OUT: // piece to be processed further, returning nil error ! req.err <- nil - receivedCount++ + codedPiece := req.piece p.coeffs = append(p.coeffs, codedPiece.Vector) + + // -- starts -- + // critical section of code, other + // go-routine might attempt to mutate + // data matrix at same time + p.lockCoded.Lock() p.coded = append(p.coded, codedPiece.Piece) + p.lockCoded.Unlock() + // -- ends -- // index of current piece of interest idx := uint64(len(p.coeffs) - 1) @@ -132,9 +142,13 @@ OUT: copy((p.coeffs)[idx:], (p.coeffs)[idx+1:]) p.coeffs = (p.coeffs)[:len(p.coeffs)-1] + // -- critical section of code begins -- + p.lockCoded.Lock() p.coded[idx] = nil copy((p.coded)[idx:], (p.coded)[idx+1:]) p.coded = (p.coded)[:len(p.coded)-1] + p.lockCoded.Unlock() + // -- ends -- atomic.StoreUint64(&p.useful, uint64(p.coeffs.Rows())) continue OUT @@ -201,7 +215,11 @@ OUT: } if p.IsDecoded() { + // safe reading + p.lockCoded.RLock() req.resp <- p.coded[req.idx] + p.lockCoded.RUnlock() + continue OUT } @@ -231,7 +249,11 @@ OUT: continue OUT } + // safe reading + p.lockCoded.RLock() req.resp <- p.coded[req.idx] + p.lockCoded.RUnlock() + continue OUT } @@ -253,15 +275,21 @@ OUT: switch w.op { case SUB_AFTER_MULT: + + p.lockCoded.RLock() for i := wState.columnStart; i <= wState.columnEnd; i++ { tmp := p.field.Mul(p.coded[w.srcRow][i], w.weight) p.coded[w.dstRow][i] = p.field.Add(p.coded[w.dstRow][i], tmp) } + p.lockCoded.RUnlock() case DIVISION: + + p.lockCoded.RLock() for i := wState.columnStart; i <= wState.columnEnd; i++ { p.coded[w.dstRow][i] = p.field.Div(p.coded[w.srcRow][i], w.weight) } + p.lockCoded.RUnlock() case STOP: // supervisor signals decoding is complete ! @@ -387,6 +415,7 @@ func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) * pieceLen: pieceLen, coeffs: make([][]byte, 0, pieceCount), coded: make([][]byte, 0, pieceCount), + lockCoded: &sync.RWMutex{}, workerQueue: make([]*work, 0), supervisorAddPieceChan: make(chan *addRequest, pieceCount), supervisorGetPieceChan: make(chan *pieceRequest, 1), diff --git a/matrix/parallel_decoder_state_test.go b/matrix/parallel_decoder_state_test.go index 0069bc3..1c908e3 100644 --- a/matrix/parallel_decoder_state_test.go +++ b/matrix/parallel_decoder_state_test.go @@ -62,6 +62,14 @@ func TestParallelDecoderState(t *testing.T) { t.Logf("decoding completed in %s\n", time.Since(start)) t.Logf("crunched %d bytes of data\n", enc.DecodableLen()) + // intentionally delay decoded piece reading due to the fact + // sometimes strangely subset of decoded pieces doesn't match + // with original pieces, but when printed after line:77, it turns + // out to be same ! + // + // I need to investigate further ! + <-time.After(time.Second) + for i := uint64(0); i < pieceCount; i++ { d_piece, err := dec_state.GetPiece(i) if err != nil { From eeeaa9b1795118c28522db8f493cd49736de76c9 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Fri, 20 Aug 2021 12:59:01 +0530 Subject: [PATCH 14/14] fixed possible race condition in worker queue --- matrix/parallel_decoder_state.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/matrix/parallel_decoder_state.go b/matrix/parallel_decoder_state.go index 5a9ddff..d98e022 100644 --- a/matrix/parallel_decoder_state.go +++ b/matrix/parallel_decoder_state.go @@ -31,8 +31,12 @@ type ParallelDecoderState struct { coeffs, coded Matrix // because competing go-routines attempt to // mutate `coded` data matrix - lockCoded *sync.RWMutex - workerQueue []*work + lockCoded *sync.RWMutex + workerQueue []*work + // supervisor for work allocation & N-many workers + // for work execution attempts to concurrently read from/ + // write to queue --- making it safe + lockWorkerQueue *sync.RWMutex workerChans []chan uint64 supervisorAddPieceChan chan *addRequest supervisorGetPieceChan chan *pieceRequest @@ -71,8 +75,13 @@ type workerState struct { func (p *ParallelDecoderState) createWork(src, dst uint64, weight byte, op OP) { w := work{srcRow: src, dstRow: dst, weight: weight, op: op} + + // -- critical section begins -- + p.lockWorkerQueue.Lock() p.workerQueue = append(p.workerQueue, &w) idx := uint(len(p.workerQueue) - 1) + p.lockWorkerQueue.Unlock() + // -- ends -- for i := 0; i < len(p.workerChans); i++ { // it's blocking call, better to use buffered channel, @@ -271,7 +280,10 @@ OUT: break OUT case idx := <-wState.workerChan: + // safe reading + p.lockWorkerQueue.RLock() w := p.workerQueue[idx] + p.lockWorkerQueue.RUnlock() switch w.op { case SUB_AFTER_MULT: @@ -417,6 +429,7 @@ func NewParallelDecoderState(ctx context.Context, pieceCount, pieceLen uint64) * coded: make([][]byte, 0, pieceCount), lockCoded: &sync.RWMutex{}, workerQueue: make([]*work, 0), + lockWorkerQueue: &sync.RWMutex{}, supervisorAddPieceChan: make(chan *addRequest, pieceCount), supervisorGetPieceChan: make(chan *pieceRequest, 1), workerCompletedReportChan: make(chan struct{}, wc),