add files
This commit is contained in:
commit
04e29ed525
7 changed files with 1337 additions and 0 deletions
319
main.go
Normal file
319
main.go
Normal file
|
@ -0,0 +1,319 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"image/jpeg"
|
||||
"io"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.zio.sh/astra/bsky2tg/bsky"
|
||||
tgbotapi "github.com/OvyFlash/telegram-bot-api"
|
||||
|
||||
// apibsky "github.com/bluesky-social/indigo/api/bsky"
|
||||
"github.com/bluesky-social/jetstream/pkg/client"
|
||||
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
|
||||
"github.com/bluesky-social/jetstream/pkg/models"
|
||||
|
||||
vidio "git.zio.sh/astra/Vidio"
|
||||
)
|
||||
|
||||
const (
|
||||
serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe"
|
||||
// serverAddr = "wss://stream.zio.blue/subscribe"
|
||||
postFormat = "%s\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
seenSeqs map[int64]struct{}
|
||||
tg *tgbotapi.BotAPI
|
||||
bsky *bsky.BSky
|
||||
}
|
||||
|
||||
func main() {
|
||||
var handle = os.Getenv("BSKY_HANDLE")
|
||||
var password = os.Getenv("BSKY_PASSWORD")
|
||||
bskyClient := bsky.NewBSky()
|
||||
err := bskyClient.Auth([]string{handle, password})
|
||||
if err != nil {
|
||||
log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelDebug.Level(),
|
||||
})))
|
||||
|
||||
logger := slog.Default()
|
||||
config := client.DefaultClientConfig()
|
||||
config.WebsocketURL = serverAddr
|
||||
config.WantedCollections = []string{"app.bsky.feed.post"}
|
||||
config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID}
|
||||
config.Compress = true
|
||||
|
||||
h := &handler{
|
||||
seenSeqs: make(map[int64]struct{}),
|
||||
bsky: bskyClient,
|
||||
}
|
||||
|
||||
scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)
|
||||
|
||||
c, err := client.NewClient(config, logger, scheduler)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), "https://bot.astra.blue/bot%s/%s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
h.tg = bot
|
||||
|
||||
if os.Getenv("TG_CHANNEL_ID") == "" {
|
||||
log.Fatal("TG_CHANNEL_ID is not set")
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
// file, err := os.Open("posts.json")
|
||||
// if err != nil {
|
||||
// fmt.Printf("Error opening file: %v\n", err)
|
||||
// return
|
||||
// }
|
||||
// defer file.Close()
|
||||
// byteValue, err := io.ReadAll(file)
|
||||
// if err != nil {
|
||||
// fmt.Printf("Error reading file: %v\n", err)
|
||||
// return
|
||||
// }
|
||||
|
||||
// var posts = struct {
|
||||
// Records []struct {
|
||||
// URI string `json:"uri"`
|
||||
// CID string `json:"cid"`
|
||||
// Value *bsky.Post `json:"value"`
|
||||
// } `json:"records"`
|
||||
// }{}
|
||||
|
||||
// // 4. Unmarshal (decode) the JSON data into the struct
|
||||
// err = json.Unmarshal(byteValue, &posts)
|
||||
// if err != nil {
|
||||
// fmt.Printf("Error unmarshaling JSON: %v\n", err)
|
||||
// return
|
||||
// }
|
||||
// for _, post := range posts.Records {
|
||||
// log.Printf("post: %s\n", post.Value.ProcessFacets())
|
||||
// s, _ := json.Marshal(post.Value)
|
||||
// h.ProcessPost(&models.Event{Did: bskyClient.Bluesky.Cfg.DID, Commit: &models.Commit{
|
||||
// Record: s,
|
||||
// RKey: strings.Split(post.URI, "/")[4],
|
||||
// CID: post.CID,
|
||||
// Collection: "app.bsky.feed.post",
|
||||
// }})
|
||||
// time.Sleep(time.Second * 2)
|
||||
// }
|
||||
// return
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
cursor := time.Now().UnixMicro()
|
||||
restartCount := 0
|
||||
loop:
|
||||
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
|
||||
log.Printf("failed to connect: %v. retrying", err)
|
||||
if restartCount >= 3 {
|
||||
log.Print("restart count limit hit, setting cursor to now\n")
|
||||
restartCount = 0
|
||||
cursor = time.Now().UnixMicro()
|
||||
} else {
|
||||
restartCount += 1
|
||||
cursor = int64(bskyClient.Bluesky.Cfg.Cursor)
|
||||
}
|
||||
goto loop
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
|
||||
if event.Commit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if event.Commit.Operation == models.CommitOperationCreate ||
|
||||
event.Commit.Operation == models.CommitOperationUpdate {
|
||||
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
|
||||
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
|
||||
h.ProcessPost(event)
|
||||
} else if event.Commit.Operation == models.CommitOperationDelete {
|
||||
h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post
|
||||
bsky.PersistAuthSession(h.bsky.Bluesky.Cfg)
|
||||
r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey)
|
||||
if e == "" {
|
||||
m := tgbotapi.NewDeleteMessage(r.ChannelID, r.MessageID)
|
||||
h.tg.Send(m)
|
||||
h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, event.Commit.Collection})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) ProcessPost(event *models.Event) error {
|
||||
ps, _ := h.bsky.ParsePost(event.Commit.Record)
|
||||
po := ps.GetEmbeds()
|
||||
cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64)
|
||||
|
||||
if ps.IsReply() { //|| ps.IsQuotePost() {
|
||||
// don't want to post replies to channel
|
||||
return nil
|
||||
}
|
||||
|
||||
var captionText string
|
||||
if ps.IsQuotePost() {
|
||||
if ps.Embed.Record.Type == "app.bsky.embed.record" {
|
||||
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.Record.URI, "/")[2])
|
||||
captionText = fmt.Sprintf(
|
||||
"<blockquote>%s</blockquote>\n<a href=\"https://bsky.app/profile/%s/post/%s\">➡️ @%s</a>\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>",
|
||||
ps.ProcessFacets(),
|
||||
strings.Split(ps.Embed.Record.Record.URI, "/")[2],
|
||||
strings.Split(ps.Embed.Record.Record.URI, "/")[4],
|
||||
handle,
|
||||
event.Did,
|
||||
event.Commit.RKey,
|
||||
h.bsky.Bluesky.Cfg.Handle)
|
||||
} else {
|
||||
handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.URI, "/")[2])
|
||||
captionText = fmt.Sprintf(
|
||||
"<blockquote>%s</blockquote>\n<a href=\"https://bsky.app/profile/%s/post/%s\">➡️ @%s</a>\n—\n<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>",
|
||||
ps.ProcessFacets(),
|
||||
strings.Split(ps.Embed.Record.URI, "/")[2],
|
||||
strings.Split(ps.Embed.Record.URI, "/")[4],
|
||||
handle,
|
||||
event.Did,
|
||||
event.Commit.RKey,
|
||||
h.bsky.Bluesky.Cfg.Handle)
|
||||
}
|
||||
}
|
||||
|
||||
if captionText == "" {
|
||||
if ps.ProcessFacets() != "" {
|
||||
captionText = fmt.Sprintf(postFormat, ps.ProcessFacets(), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
|
||||
} else {
|
||||
captionText = fmt.Sprintf("<a href=\"https://bsky.app/profile/%s/post/%s\">🦋 @%s</a>", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)
|
||||
}
|
||||
}
|
||||
|
||||
// post has media
|
||||
if len((*po)) != 0 {
|
||||
mediaGroup := []tgbotapi.InputMedia{}
|
||||
if (*po)[0].Type == "external" {
|
||||
tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL((*po)[0].URI)) // is most likely gif from Tenor
|
||||
tenorGif.Caption = captionText
|
||||
tenorGif.ParseMode = tgbotapi.ModeHTML
|
||||
mediaGroup = append(mediaGroup, &tenorGif)
|
||||
} else {
|
||||
for _, media := range *po {
|
||||
switch media.Type {
|
||||
case "image":
|
||||
mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)))
|
||||
if len(mediaGroup) == 0 {
|
||||
mediaAdd.Caption = captionText
|
||||
mediaAdd.ParseMode = tgbotapi.ModeHTML
|
||||
}
|
||||
mediaGroup = append(mediaGroup, &mediaAdd)
|
||||
case "video":
|
||||
log.Printf("Fetching video: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
||||
resp, _ := http.Get(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
||||
defer resp.Body.Close()
|
||||
f, _ := os.Create(media.URI + ".mp4")
|
||||
io.Copy(f, resp.Body)
|
||||
f.Seek(0, 0)
|
||||
mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f})
|
||||
metadata, err := getVideoMetadata(f.Name())
|
||||
if err != nil {
|
||||
log.Printf("Unable to read video metadata: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))
|
||||
break
|
||||
}
|
||||
mediaAdd.SupportsStreaming = true
|
||||
mediaAdd.Height = metadata.Height()
|
||||
mediaAdd.Width = metadata.Width()
|
||||
mediaAdd.Duration = int(metadata.Duration())
|
||||
|
||||
frames, _ := metadata.ReadFrames(0)
|
||||
var buf bytes.Buffer
|
||||
jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90})
|
||||
mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()}
|
||||
if len(mediaGroup) == 0 {
|
||||
mediaAdd.Caption = captionText
|
||||
mediaAdd.ParseMode = tgbotapi.ModeHTML
|
||||
}
|
||||
os.Remove(media.URI + ".mp4")
|
||||
mediaGroup = append(mediaGroup, &mediaAdd)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(mediaGroup) == 0 {
|
||||
log.Print("No mediaGroup to send, see previous error")
|
||||
} else {
|
||||
resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup))
|
||||
uri, cid := getLink(event)
|
||||
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
|
||||
ChannelID: resp[0].Chat.ID,
|
||||
MessageID: resp[0].MessageID,
|
||||
Link: &bsky.Link{
|
||||
Cid: cid,
|
||||
URI: uri,
|
||||
},
|
||||
}, event.Commit.RKey)
|
||||
}
|
||||
} else {
|
||||
m := tgbotapi.MessageConfig{}
|
||||
if captionText == "" {
|
||||
m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, ps.ProcessFacets(), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle))
|
||||
} else {
|
||||
m = tgbotapi.NewMessage(cid, captionText)
|
||||
}
|
||||
m.ParseMode = tgbotapi.ModeHTML
|
||||
if ps.IsQuotePost() {
|
||||
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{
|
||||
IsDisabled: false,
|
||||
URL: fmt.Sprintf("https://bsky.app/profile/%s/post/%s",
|
||||
strings.Split(ps.Embed.Record.URI, "/")[2],
|
||||
strings.Split(ps.Embed.Record.URI, "/")[4]),
|
||||
PreferSmallMedia: true,
|
||||
ShowAboveText: true,
|
||||
}
|
||||
} else {
|
||||
m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true}
|
||||
}
|
||||
resp, _ := h.tg.Send(m)
|
||||
uri, cid := getLink(event)
|
||||
h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{
|
||||
ChannelID: resp.Chat.ID,
|
||||
MessageID: resp.MessageID,
|
||||
Link: &bsky.Link{
|
||||
Cid: cid,
|
||||
URI: uri,
|
||||
},
|
||||
}, event.Commit.RKey)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildBlobURL(server string, did string, cid string) string {
|
||||
return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + url.QueryEscape(cid)
|
||||
}
|
||||
|
||||
func getLink(event *models.Event) (string, string) {
|
||||
return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID
|
||||
}
|
||||
|
||||
func getVideoMetadata(fname string) (video *vidio.Video, err error) {
|
||||
video, err = vidio.NewVideo(fname)
|
||||
return
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue