Skip to content

Commit a19f720

Browse files
committed
add compression/decompression pipeline
1 parent 3eb619f commit a19f720

File tree

6 files changed

+111
-15
lines changed

6 files changed

+111
-15
lines changed

internal/compress/compress.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package compress
2+
3+
import (
4+
"compress/flate"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
9+
"bytes"
10+
11+
"github.com/golang/snappy"
12+
)
13+
14+
func SnappyDecompress(compressedMsg []byte) ([]byte, error) {
15+
body, err := snappy.Decode(nil, compressedMsg)
16+
if err != nil {
17+
return nil, fmt.Errorf("error in decode snappy compressed message: %v", err)
18+
}
19+
20+
return body, nil
21+
}
22+
23+
func DeflateDecompress(compressedMsg []byte) ([]byte, error) {
24+
br := bytes.NewReader(compressedMsg)
25+
fr := flate.NewReader(br)
26+
defer fr.Close()
27+
body, err := ioutil.ReadAll(fr)
28+
if err != nil && !(err == io.ErrUnexpectedEOF && br.Len() == 0) {
29+
return nil, fmt.Errorf("error in decode deflate compressed message: %v", err)
30+
}
31+
32+
return body, nil
33+
}

nsqd/client_v2.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,10 @@ type clientV2 struct {
9696
IdentifyEventChan chan identifyEvent
9797
SubEventChan chan *Channel
9898

99-
TLS int32
100-
Snappy int32
101-
Deflate int32
99+
TLS int32
100+
Snappy int32
101+
Deflate int32
102+
DeflateLevel int32
102103

103104
// re-usable buffer for reading the 4-byte lengths off the wire
104105
lenBuf [4]byte
@@ -511,6 +512,7 @@ func (c *clientV2) UpgradeDeflate(level int) error {
511512
c.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)
512513

513514
atomic.StoreInt32(&c.Deflate, 1)
515+
atomic.StoreInt32(&c.DeflateLevel, int32(level))
514516

515517
return nil
516518
}

nsqd/http.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,27 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
222222
}
223223
}
224224

225+
var isSnappy, isDeflate bool
226+
if value, exist := req.Header["Content-Encoding"]; exist {
227+
for _, encoding := range value {
228+
if encoding == "snappy" {
229+
isSnappy = true
230+
}
231+
232+
if encoding == "deflate" {
233+
isDeflate = true
234+
}
235+
}
236+
}
237+
238+
if isSnappy && isDeflate {
239+
return nil, http_api.Err{400, "INVALID_CONTENT_ENCODING"}
240+
}
241+
225242
msg := NewMessage(topic.GenerateID(), body)
226243
msg.deferred = deferred
244+
msg.snappyCompressed = isSnappy
245+
msg.deflateCompressed = isDeflate
227246
err = topic.PutMessage(msg)
228247
if err != nil {
229248
return nil, http_api.Err{503, "EXITING"}
@@ -248,6 +267,23 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
248267
return nil, err
249268
}
250269

270+
var isSnappy, isDeflate bool
271+
if value, exist := req.Header["Content-Encoding"]; exist {
272+
for _, encoding := range value {
273+
if encoding == "snappy" {
274+
isSnappy = true
275+
}
276+
277+
if encoding == "deflate" {
278+
isDeflate = true
279+
}
280+
}
281+
}
282+
283+
if isSnappy && isDeflate {
284+
return nil, http_api.Err{400, "INVALID_CONTENT_ENCODING"}
285+
}
286+
251287
// text mode is default, but unrecognized binary opt considered true
252288
binaryMode := false
253289
if vals, ok := reqParams["binary"]; ok {
@@ -302,6 +338,11 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
302338
}
303339
}
304340

341+
for _, msg := range msgs {
342+
msg.snappyCompressed = isSnappy
343+
msg.deflateCompressed = isDeflate
344+
}
345+
305346
err = topic.PutMessages(msgs)
306347
if err != nil {
307348
return nil, http_api.Err{503, "EXITING"}

nsqd/message.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ type Message struct {
2727
pri int64
2828
index int
2929
deferred time.Duration
30+
31+
snappyCompressed bool
32+
deflateCompressed bool
3033
}
3134

3235
func NewMessage(id MessageID, body []byte) *Message {

nsqd/protocol_v2.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414
"unsafe"
1515

16+
"github.com/nsqio/nsq/internal/compress"
1617
"github.com/nsqio/nsq/internal/protocol"
1718
"github.com/nsqio/nsq/internal/version"
1819
)
@@ -126,7 +127,28 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error {
126127
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body)
127128
var buf = &bytes.Buffer{}
128129

129-
_, err := msg.WriteTo(buf)
130+
adjustedBody := msg.Body
131+
var err error
132+
if msg.snappyCompressed {
133+
// message is snappy compressed
134+
adjustedBody, err = compress.SnappyDecompress(msg.Body)
135+
if err != nil {
136+
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): decompress msg(%s) with snappy error", msg.ID)
137+
return err
138+
}
139+
msg.snappyCompressed = false
140+
} else if msg.deflateCompressed {
141+
// message is deflate compressed
142+
adjustedBody, err = compress.DeflateDecompress(msg.Body)
143+
if err != nil {
144+
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): decompress msg(%s) with deflate error", msg.ID)
145+
return err
146+
}
147+
msg.deflateCompressed = false
148+
}
149+
150+
msg.Body = adjustedBody
151+
_, err = msg.WriteTo(buf)
130152
if err != nil {
131153
return err
132154
}

nsqd/topic.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -307,17 +307,12 @@ func (t *Topic) messagePump() {
307307
goto exit
308308
}
309309

310-
for i, channel := range chans {
311-
chanMsg := msg
312-
// copy the message because each channel
313-
// needs a unique instance but...
314-
// fastpath to avoid copy if its the first channel
315-
// (the topic already created the first copy)
316-
if i > 0 {
317-
chanMsg = NewMessage(msg.ID, msg.Body)
318-
chanMsg.Timestamp = msg.Timestamp
319-
chanMsg.deferred = msg.deferred
320-
}
310+
for _, channel := range chans {
311+
chanMsg := NewMessage(msg.ID, msg.Body)
312+
chanMsg.Timestamp = msg.Timestamp
313+
chanMsg.deferred = msg.deferred
314+
chanMsg.deflateCompressed = msg.deflateCompressed
315+
chanMsg.snappyCompressed = msg.snappyCompressed
321316
if chanMsg.deferred != 0 {
322317
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
323318
continue

0 commit comments

Comments
 (0)