Properly escape null character in the consumer too
parent
693ae1ba0a
commit
ffa2faa420
|
@ -10,7 +10,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -28,6 +27,7 @@ import (
|
||||||
"github.com/uabluerail/indexer/models"
|
"github.com/uabluerail/indexer/models"
|
||||||
"github.com/uabluerail/indexer/pds"
|
"github.com/uabluerail/indexer/pds"
|
||||||
"github.com/uabluerail/indexer/repo"
|
"github.com/uabluerail/indexer/repo"
|
||||||
|
"github.com/uabluerail/indexer/util/fix"
|
||||||
"github.com/uabluerail/indexer/util/resolver"
|
"github.com/uabluerail/indexer/util/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -211,12 +211,6 @@ func (c *Consumer) updateCursor(ctx context.Context, seq int64) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var postgresFixRegexp = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`)
|
|
||||||
|
|
||||||
func escapeNullCharForPostgres(b []byte) []byte {
|
|
||||||
return postgresFixRegexp.ReplaceAll(b, []byte(`$1<0x00>`))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error {
|
func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader, first bool) error {
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
|
|
||||||
|
@ -331,7 +325,7 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
||||||
// XXX: proper replacement of \u0000 would require full parsing of JSON
|
// XXX: proper replacement of \u0000 would require full parsing of JSON
|
||||||
// and recursive iteration over all string values, but this
|
// and recursive iteration over all string values, but this
|
||||||
// should work well enough for now.
|
// should work well enough for now.
|
||||||
Content: escapeNullCharForPostgres(v),
|
Content: fix.EscapeNullCharForPostgres(v),
|
||||||
AtRev: payload.Rev,
|
AtRev: payload.Rev,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/uabluerail/indexer/models"
|
"github.com/uabluerail/indexer/models"
|
||||||
"github.com/uabluerail/indexer/pds"
|
"github.com/uabluerail/indexer/pds"
|
||||||
"github.com/uabluerail/indexer/repo"
|
"github.com/uabluerail/indexer/repo"
|
||||||
|
"github.com/uabluerail/indexer/util/fix"
|
||||||
"github.com/uabluerail/indexer/util/resolver"
|
"github.com/uabluerail/indexer/util/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -132,14 +133,6 @@ func (p *WorkerPool) worker(ctx context.Context, signal chan struct{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var postgresFixRegexp = regexp.MustCompile(`([^\\](\\\\)*)(\\u0000)+`)
|
|
||||||
|
|
||||||
func escapeNullCharForPostgres(b []byte) []byte {
|
|
||||||
return postgresFixRegexp.ReplaceAllFunc(b, func(b []byte) []byte {
|
|
||||||
return bytes.ReplaceAll(b, []byte(`\u0000`), []byte(`<0x00>`))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error {
|
func (p *WorkerPool) doWork(ctx context.Context, work WorkItem) error {
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
defer close(work.signal)
|
defer close(work.signal)
|
||||||
|
@ -244,7 +237,7 @@ retry:
|
||||||
// XXX: proper replacement of \u0000 would require full parsing of JSON
|
// XXX: proper replacement of \u0000 would require full parsing of JSON
|
||||||
// and recursive iteration over all string values, but this
|
// and recursive iteration over all string values, but this
|
||||||
// should work well enough for now.
|
// should work well enough for now.
|
||||||
Content: escapeNullCharForPostgres(v),
|
Content: fix.EscapeNullCharForPostgres(v),
|
||||||
AtRev: newRev,
|
AtRev: newRev,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
package fix
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"regexp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var postgresFixRegexp = regexp.MustCompile(`([^\\](\\\\)*)(\\u0000)+`)
|
||||||
|
|
||||||
|
func EscapeNullCharForPostgres(b []byte) []byte {
|
||||||
|
return postgresFixRegexp.ReplaceAllFunc(b, func(b []byte) []byte {
|
||||||
|
return bytes.ReplaceAll(b, []byte(`\u0000`), []byte(`<0x00>`))
|
||||||
|
})
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package fix
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -18,7 +18,7 @@ func TestPostgresFix(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
got := escapeNullCharForPostgres([]byte(tc.input))
|
got := EscapeNullCharForPostgres([]byte(tc.input))
|
||||||
if string(got) != tc.want {
|
if string(got) != tc.want {
|
||||||
t.Errorf("escapeNullCharForPostgres(%s) = %s, want %s", tc.input, string(got), tc.want)
|
t.Errorf("escapeNullCharForPostgres(%s) = %s, want %s", tc.input, string(got), tc.want)
|
||||||
}
|
}
|
Loading…
Reference in New Issue